Dateien nach "custom_components/bahmcloud_store" hochladen

This commit is contained in:
2026-01-15 14:08:13 +00:00
parent 4ff94bc185
commit 1fc274bf7c
5 changed files with 587 additions and 0 deletions

View File

@@ -0,0 +1,59 @@
from __future__ import annotations
import logging
from datetime import timedelta
from homeassistant.core import HomeAssistant
from homeassistant.const import Platform
from homeassistant.helpers.discovery import async_load_platform
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.components.panel_custom import async_register_panel
from .core import BCSCore, BCSConfig, BCSError
_LOGGER = logging.getLogger(__name__)
DOMAIN = "bahmcloud_store"
DEFAULT_STORE_URL = "https://git.bahmcloud.de/bahmcloud/ha_store/raw/branch/main/store.yaml"
CONF_STORE_URL = "store_url"
async def async_setup(hass: HomeAssistant, config: dict) -> bool:
cfg = config.get(DOMAIN, {})
store_url = cfg.get(CONF_STORE_URL, DEFAULT_STORE_URL)
core = BCSCore(hass, BCSConfig(store_url=store_url))
hass.data[DOMAIN] = core
await core.register_http_views()
# RESTORE: keep the module_url pattern that worked for you
await async_register_panel(
hass,
frontend_url_path="bahmcloud-store",
webcomponent_name="bahmcloud-store-panel",
module_url="/api/bahmcloud_store_static/panel.js?v=42",
sidebar_title="Bahmcloud Store",
sidebar_icon="mdi:store",
require_admin=True,
config={},
)
try:
await core.refresh()
except BCSError as e:
_LOGGER.error("Initial refresh failed: %s", e)
async def periodic(_now) -> None:
try:
await core.refresh()
core.signal_updated()
except BCSError as e:
_LOGGER.warning("Periodic refresh failed: %s", e)
interval = timedelta(seconds=int(core.refresh_seconds or 300))
async_track_time_interval(hass, periodic, interval)
await async_load_platform(hass, Platform.UPDATE, DOMAIN, {}, config)
return True

View File

@@ -0,0 +1,323 @@
from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.util import yaml as ha_yaml
from .storage import BCSStorage, CustomRepo
from .views import StaticAssetsView, BCSApiView, BCSReadmeView
from .custom_repo_view import BCSCustomRepoView
from .providers import fetch_repo_info, detect_provider, RepoInfo
from .metadata import fetch_repo_metadata, RepoMetadata
_LOGGER = logging.getLogger(__name__)
DOMAIN = "bahmcloud_store"
class BCSError(Exception):
"""BCS core error."""
@dataclass
class BCSConfig:
store_url: str
@dataclass
class RepoItem:
id: str
name: str
url: str
source: str # "index" | "custom"
owner: str | None = None
provider: str | None = None
provider_repo_name: str | None = None
provider_description: str | None = None
default_branch: str | None = None
latest_version: str | None = None
latest_version_source: str | None = None # "release" | "tag" | None
meta_source: str | None = None
meta_name: str | None = None
meta_description: str | None = None
meta_category: str | None = None
meta_author: str | None = None
meta_maintainer: str | None = None
class BCSCore:
def __init__(self, hass: HomeAssistant, config: BCSConfig) -> None:
self.hass = hass
self.config = config
self.storage = BCSStorage(hass)
self.refresh_seconds: int = 300
self.repos: dict[str, RepoItem] = {}
self._listeners: list[callable] = []
self.version: str = self._read_manifest_version()
def _read_manifest_version(self) -> str:
try:
manifest_path = Path(__file__).resolve().parent / "manifest.json"
data = json.loads(manifest_path.read_text(encoding="utf-8"))
v = data.get("version")
return str(v) if v else "unknown"
except Exception:
return "unknown"
def add_listener(self, cb) -> None:
self._listeners.append(cb)
def signal_updated(self) -> None:
for cb in list(self._listeners):
try:
cb()
except Exception:
pass
async def register_http_views(self) -> None:
self.hass.http.register_view(StaticAssetsView())
self.hass.http.register_view(BCSApiView(self))
self.hass.http.register_view(BCSReadmeView(self))
self.hass.http.register_view(BCSCustomRepoView(self))
def get_repo(self, repo_id: str) -> RepoItem | None:
return self.repos.get(repo_id)
async def refresh(self) -> None:
index_repos, refresh_seconds = await self._load_index_repos()
self.refresh_seconds = refresh_seconds
custom_repos = await self.storage.list_custom_repos()
merged: dict[str, RepoItem] = {}
for item in index_repos:
merged[item.id] = item
for c in custom_repos:
merged[c.id] = RepoItem(
id=c.id,
name=(c.name or c.url),
url=c.url,
source="custom",
)
for r in merged.values():
r.provider = detect_provider(r.url)
await self._enrich_and_resolve(merged)
self.repos = merged
async def _enrich_and_resolve(self, merged: dict[str, RepoItem]) -> None:
sem = asyncio.Semaphore(6)
async def process_one(r: RepoItem) -> None:
async with sem:
info: RepoInfo = await fetch_repo_info(self.hass, r.url)
r.provider = info.provider or r.provider
r.owner = info.owner or r.owner
r.provider_repo_name = info.repo_name
r.provider_description = info.description
r.default_branch = info.default_branch or r.default_branch
r.latest_version = info.latest_version
r.latest_version_source = info.latest_version_source
md: RepoMetadata = await fetch_repo_metadata(self.hass, r.url, r.default_branch)
r.meta_source = md.source
r.meta_name = md.name
r.meta_description = md.description
r.meta_category = md.category
r.meta_author = md.author
r.meta_maintainer = md.maintainer
has_user_or_index_name = bool(r.name) and (r.name != r.url) and (not str(r.name).startswith("http"))
if r.meta_name:
r.name = r.meta_name
elif not has_user_or_index_name and r.provider_repo_name:
r.name = r.provider_repo_name
elif not r.name:
r.name = r.url
await asyncio.gather(*(process_one(r) for r in merged.values()), return_exceptions=True)
async def _load_index_repos(self) -> tuple[list[RepoItem], int]:
session = async_get_clientsession(self.hass)
try:
async with session.get(self.config.store_url, timeout=20) as resp:
if resp.status != 200:
raise BCSError(f"store_url returned {resp.status}")
raw = await resp.text()
except Exception as e:
raise BCSError(f"Failed fetching store index: {e}") from e
try:
data = ha_yaml.parse_yaml(raw)
if not isinstance(data, dict):
raise BCSError("store.yaml must be a mapping")
refresh_seconds = int(data.get("refresh_seconds", 300))
repos = data.get("repos", [])
if not isinstance(repos, list):
raise BCSError("store.yaml 'repos' must be a list")
items: list[RepoItem] = []
for i, r in enumerate(repos):
if not isinstance(r, dict):
continue
url = str(r.get("url", "")).strip()
if not url:
continue
name = str(r.get("name") or url).strip()
items.append(
RepoItem(
id=f"index:{i}",
name=name,
url=url,
source="index",
)
)
return items, refresh_seconds
except Exception as e:
raise BCSError(f"Invalid store.yaml: {e}") from e
async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo:
repo = await self.storage.add_custom_repo(url=url, name=name)
await self.refresh()
self.signal_updated()
return repo
async def remove_custom_repo(self, repo_id: str) -> None:
await self.storage.remove_custom_repo(repo_id)
await self.refresh()
self.signal_updated()
async def list_custom_repos(self) -> list[CustomRepo]:
return await self.storage.list_custom_repos()
def list_repos_public(self) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for r in self.repos.values():
resolved_description = r.meta_description or r.provider_description
out.append(
{
"id": r.id,
"name": r.name,
"url": r.url,
"source": r.source,
"owner": r.owner,
"provider": r.provider,
"meta_source": r.meta_source,
"meta_name": r.meta_name,
"meta_description": r.meta_description,
"meta_category": r.meta_category,
"meta_author": r.meta_author,
"meta_maintainer": r.meta_maintainer,
"provider_repo_name": r.provider_repo_name,
"provider_description": r.provider_description,
"description": resolved_description,
"category": r.meta_category,
"latest_version": r.latest_version,
"latest_version_source": r.latest_version_source,
}
)
return out
# ----------------------------
# README fetching
# ----------------------------
def _normalize_repo_name(self, name: str | None) -> str | None:
if not name:
return None
n = name.strip()
if n.endswith(".git"):
n = n[:-4]
return n or None
def _split_owner_repo(self, repo_url: str) -> tuple[str | None, str | None]:
u = urlparse(repo_url.rstrip("/"))
parts = [p for p in u.path.strip("/").split("/") if p]
if len(parts) < 2:
return None, None
owner = parts[0].strip() or None
repo = self._normalize_repo_name(parts[1])
return owner, repo
def _is_github(self, repo_url: str) -> bool:
return "github.com" in urlparse(repo_url).netloc.lower()
def _is_gitea(self, repo_url: str) -> bool:
host = urlparse(repo_url).netloc.lower()
return host and "github.com" not in host and "gitlab.com" not in host
async def _fetch_text(self, url: str) -> str | None:
session = async_get_clientsession(self.hass)
try:
async with session.get(url, timeout=20) as resp:
if resp.status != 200:
return None
return await resp.text()
except Exception:
return None
async def fetch_readme_markdown(self, repo_id: str) -> str | None:
repo = self.get_repo(repo_id)
if not repo:
return None
owner, name = self._split_owner_repo(repo.url)
if not owner or not name:
return None
branch = repo.default_branch or "main"
filenames = ["README.md", "readme.md", "README.MD"]
candidates: list[str] = []
if self._is_github(repo.url):
# raw github content
base = f"https://raw.githubusercontent.com/{owner}/{name}/{branch}"
candidates.extend([f"{base}/{fn}" for fn in filenames])
elif self._is_gitea(repo.url):
u = urlparse(repo.url.rstrip("/"))
root = f"{u.scheme}://{u.netloc}/{owner}/{name}"
# gitea raw endpoints (both common forms)
bases = [
f"{root}/raw/branch/{branch}",
f"{root}/raw/{branch}",
]
for b in bases:
candidates.extend([f"{b}/{fn}" for fn in filenames])
else:
return None
for url in candidates:
txt = await self._fetch_text(url)
if txt:
return txt
return None

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from homeassistant.components.http import HomeAssistantView
if TYPE_CHECKING:
from .core import BCSCore
class BCSCustomRepoView(HomeAssistantView):
"""
DELETE /api/bcs/custom_repo?id=...
"""
requires_auth = True
name = "bcs_custom_repo_api"
url = "/api/bcs/custom_repo"
def __init__(self, core: "BCSCore") -> None:
self.core = core
async def delete(self, request):
repo_id = request.query.get("id", "").strip()
if not repo_id:
return self.json({"error": "id missing"}, status_code=400)
await self.core.remove_custom_repo(repo_id)
return self.json({"ok": True})

View File

@@ -0,0 +1,9 @@
{
"domain": "bahmcloud_store",
"name": "Bahmcloud Store",
"version": "0.4.0",
"documentation": "https://git.bahmcloud.de/bahmcloud/bahmcloud_store",
"requirements": [],
"codeowners": [],
"iot_class": "local_polling"
}

View File

@@ -0,0 +1,168 @@
from __future__ import annotations
import json
import logging
from dataclasses import dataclass
from urllib.parse import urlparse
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.util import yaml as ha_yaml
_LOGGER = logging.getLogger(__name__)
@dataclass
class RepoMetadata:
source: str | None = None # "bcs.yaml" | "hacs.yaml" | "hacs.json" | None
name: str | None = None
description: str | None = None
category: str | None = None
author: str | None = None
maintainer: str | None = None
def _normalize_repo_name(name: str | None) -> str | None:
if not name:
return None
n = name.strip()
if n.endswith(".git"):
n = n[:-4]
return n or None
def _split_owner_repo(repo_url: str) -> tuple[str | None, str | None]:
u = urlparse(repo_url.rstrip("/"))
parts = [p for p in u.path.strip("/").split("/") if p]
if len(parts) < 2:
return None, None
owner = parts[0].strip() or None
repo = _normalize_repo_name(parts[1])
return owner, repo
def _is_github(repo_url: str) -> bool:
return "github.com" in urlparse(repo_url).netloc.lower()
def _is_gitlab(repo_url: str) -> bool:
return "gitlab" in urlparse(repo_url).netloc.lower()
def _is_gitea(repo_url: str) -> bool:
host = urlparse(repo_url).netloc.lower()
return host and ("github.com" not in host) and ("gitlab" not in host)
async def _fetch_text(hass: HomeAssistant, url: str) -> str | None:
session = async_get_clientsession(hass)
try:
async with session.get(url, timeout=20) as resp:
if resp.status != 200:
return None
return await resp.text()
except Exception:
return None
def _parse_meta_yaml(raw: str, source: str) -> RepoMetadata:
try:
data = ha_yaml.parse_yaml(raw)
if not isinstance(data, dict):
return RepoMetadata(source=source)
return RepoMetadata(
source=source,
name=data.get("name"),
description=data.get("description"),
category=data.get("category"),
author=data.get("author"),
maintainer=data.get("maintainer"),
)
except Exception:
return RepoMetadata(source=source)
def _parse_meta_hacs_json(raw: str) -> RepoMetadata:
try:
data = json.loads(raw)
if not isinstance(data, dict):
return RepoMetadata(source="hacs.json")
name = data.get("name")
description = data.get("description")
author = data.get("author")
maintainer = data.get("maintainer")
category = data.get("category") or data.get("type")
return RepoMetadata(
source="hacs.json",
name=name if isinstance(name, str) else None,
description=description if isinstance(description, str) else None,
category=category if isinstance(category, str) else None,
author=author if isinstance(author, str) else None,
maintainer=maintainer if isinstance(maintainer, str) else None,
)
except Exception:
return RepoMetadata(source="hacs.json")
async def fetch_repo_metadata(hass: HomeAssistant, repo_url: str, default_branch: str | None) -> RepoMetadata:
owner, repo = _split_owner_repo(repo_url)
if not owner or not repo:
return RepoMetadata()
branch = default_branch or "main"
# Priority:
# 1) bcs.yaml
# 2) hacs.yaml
# 3) hacs.json
filenames = ["bcs.yaml", "hacs.yaml", "hacs.json"]
candidates: list[tuple[str, str]] = []
if _is_github(repo_url):
base = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}"
for fn in filenames:
candidates.append((fn, f"{base}/{fn}"))
elif _is_gitlab(repo_url):
u = urlparse(repo_url.rstrip("/"))
root = f"{u.scheme}://{u.netloc}/{owner}/{repo}"
# GitLab raw format
# https://gitlab.com/<owner>/<repo>/-/raw/<branch>/<file>
for fn in filenames:
candidates.append((fn, f"{root}/-/raw/{branch}/{fn}"))
elif _is_gitea(repo_url):
u = urlparse(repo_url.rstrip("/"))
root = f"{u.scheme}://{u.netloc}/{owner}/{repo}"
bases = [
f"{root}/raw/branch/{branch}",
f"{root}/raw/{branch}",
]
for fn in filenames:
for b in bases:
candidates.append((fn, f"{b}/{fn}"))
else:
return RepoMetadata()
for fn, url in candidates:
raw = await _fetch_text(hass, url)
if not raw:
continue
if fn.endswith(".json"):
meta = _parse_meta_hacs_json(raw)
if meta.source:
return meta
continue
meta = _parse_meta_yaml(raw, fn)
if meta.source:
return meta
return RepoMetadata()