custom_components/bahmcloud_store/core.py aktualisiert

This commit is contained in:
2026-01-15 15:09:33 +00:00
parent e863677428
commit 28b86e19e1

View File

@@ -1,20 +1,36 @@
from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import asdict, dataclass
from datetime import timedelta
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.util import yaml as ha_yaml
from .providers import fetch_provider_info
from .storage import BCSStorage, CustomRepo
from .views import StaticAssetsView, BCSApiView, BCSReadmeView
from .custom_repo_view import BCSCustomRepoView
from .providers import fetch_repo_info, detect_provider, RepoInfo
from .metadata import fetch_repo_metadata, RepoMetadata
_LOGGER = logging.getLogger(__name__)
DOMAIN = "bahmcloud_store"
class BCSError(Exception):
"""BCS core error."""
@dataclass
class BCSConfig:
store_url: str
@dataclass
class RepoItem:
@@ -30,148 +46,179 @@ class RepoItem:
default_branch: str | None = None
latest_version: str | None = None
latest_version_source: str | None = None
description: str | None = None
category: str | None = None
latest_version_source: str | None = None # "release" | "tag" | None
meta_source: str | None = None
meta_name: str | None = None
meta_description: str | None = None
meta_category: str | None = None
meta_author: str | None = None
meta_maintainer: str | None = None
meta_source: str | None = None
class BahmcloudStoreCore:
def __init__(self, hass: HomeAssistant, store: Any, storage: Any, metadata: Any) -> None:
class BCSCore:
def __init__(self, hass: HomeAssistant, config: BCSConfig) -> None:
self.hass = hass
self.store = store
self.storage = storage
self.metadata = metadata
self.config = config
self.storage = BCSStorage(hass)
self._repos: list[RepoItem] = []
self._repos_by_id: dict[str, RepoItem] = {}
self.refresh_seconds: int = 300
self.repos: dict[str, RepoItem] = {}
self._listeners: list[callable] = []
self._refresh_lock = asyncio.Lock()
self._refresh_unsub = None
self.version: str = self._read_manifest_version()
def start(self) -> None:
# Refresh every ~300 seconds
self._refresh_unsub = async_track_time_interval(
self.hass, self._refresh_task, timedelta(seconds=300)
)
self.hass.async_create_task(self.refresh())
def _read_manifest_version(self) -> str:
try:
manifest_path = Path(__file__).resolve().parent / "manifest.json"
data = json.loads(manifest_path.read_text(encoding="utf-8"))
v = data.get("version")
return str(v) if v else "unknown"
except Exception:
return "unknown"
async def stop(self) -> None:
if self._refresh_unsub:
self._refresh_unsub()
self._refresh_unsub = None
def add_listener(self, cb) -> None:
self._listeners.append(cb)
async def _refresh_task(self, _now: Any) -> None:
await self.refresh()
def signal_updated(self) -> None:
for cb in list(self._listeners):
try:
cb()
except Exception:
pass
async def register_http_views(self) -> None:
self.hass.http.register_view(StaticAssetsView())
self.hass.http.register_view(BCSApiView(self))
self.hass.http.register_view(BCSReadmeView(self))
self.hass.http.register_view(BCSCustomRepoView(self))
def get_repo(self, repo_id: str) -> RepoItem | None:
return self._repos_by_id.get(repo_id)
return self.repos.get(repo_id)
async def refresh(self) -> None:
async with self._refresh_lock:
try:
index_repos = await self.store.load_index_repos()
custom_repos = await self.storage.load_custom_repos()
index_repos, refresh_seconds = await self._load_index_repos()
self.refresh_seconds = refresh_seconds
merged: list[RepoItem] = []
by_id: dict[str, RepoItem] = {}
custom_repos = await self.storage.list_custom_repos()
# index repos
for r in index_repos:
item = RepoItem(
id=str(r.get("id")),
name=str(r.get("name") or "Unnamed repository"),
url=str(r.get("url") or ""),
merged: dict[str, RepoItem] = {}
for item in index_repos:
merged[item.id] = item
for c in custom_repos:
merged[c.id] = RepoItem(
id=c.id,
name=(c.name or c.url),
url=c.url,
source="custom",
)
for r in merged.values():
r.provider = detect_provider(r.url)
await self._enrich_and_resolve(merged)
self.repos = merged
async def _enrich_and_resolve(self, merged: dict[str, RepoItem]) -> None:
sem = asyncio.Semaphore(6)
async def process_one(r: RepoItem) -> None:
async with sem:
info: RepoInfo = await fetch_repo_info(self.hass, r.url)
r.provider = info.provider or r.provider
r.owner = info.owner or r.owner
r.provider_repo_name = info.repo_name
r.provider_description = info.description
r.default_branch = info.default_branch or r.default_branch
r.latest_version = info.latest_version
r.latest_version_source = info.latest_version_source
md: RepoMetadata = await fetch_repo_metadata(self.hass, r.url, r.default_branch)
r.meta_source = md.source
r.meta_name = md.name
r.meta_description = md.description
r.meta_category = md.category
r.meta_author = md.author
r.meta_maintainer = md.maintainer
has_user_or_index_name = bool(r.name) and (r.name != r.url) and (not str(r.name).startswith("http"))
if r.meta_name:
r.name = r.meta_name
elif not has_user_or_index_name and r.provider_repo_name:
r.name = r.provider_repo_name
elif not r.name:
r.name = r.url
await asyncio.gather(*(process_one(r) for r in merged.values()), return_exceptions=True)
async def _load_index_repos(self) -> tuple[list[RepoItem], int]:
session = async_get_clientsession(self.hass)
try:
async with session.get(self.config.store_url, timeout=20) as resp:
if resp.status != 200:
raise BCSError(f"store_url returned {resp.status}")
raw = await resp.text()
except Exception as e:
raise BCSError(f"Failed fetching store index: {e}") from e
try:
data = ha_yaml.parse_yaml(raw)
if not isinstance(data, dict):
raise BCSError("store.yaml must be a mapping")
refresh_seconds = int(data.get("refresh_seconds", 300))
repos = data.get("repos", [])
if not isinstance(repos, list):
raise BCSError("store.yaml 'repos' must be a list")
items: list[RepoItem] = []
for i, r in enumerate(repos):
if not isinstance(r, dict):
continue
url = str(r.get("url", "")).strip()
if not url:
continue
name = str(r.get("name") or url).strip()
items.append(
RepoItem(
id=f"index:{i}",
name=name,
url=url,
source="index",
description=r.get("description") if isinstance(r.get("description"), str) else None,
category=r.get("category") if isinstance(r.get("category"), str) else None,
)
merged.append(item)
by_id[item.id] = item
)
# custom repos
for r in custom_repos:
item = RepoItem(
id=str(r.get("id")),
name=str(r.get("name") or "Custom repository"),
url=str(r.get("url") or ""),
source="custom",
)
merged.append(item)
by_id[item.id] = item
return items, refresh_seconds
except Exception as e:
raise BCSError(f"Invalid store.yaml: {e}") from e
self._repos = merged
self._repos_by_id = by_id
async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo:
url = str(url or "").strip()
if not url:
raise BCSError("Missing url")
# Enrich from providers + metadata
await self._enrich_repos()
except Exception as e:
_LOGGER.exception("BCS refresh failed: %s", e)
async def _enrich_repos(self) -> None:
# Provider + metadata enrichment in sequence (safe + avoids rate spikes)
for repo in list(self._repos):
try:
info = await fetch_provider_info(self.hass, repo.url)
repo.owner = info.owner
repo.provider = info.provider
repo.provider_repo_name = info.repo_name
repo.provider_description = info.description
repo.default_branch = info.default_branch
repo.latest_version = info.latest_version
repo.latest_version_source = info.latest_version_source
# Metadata (optional)
meta = await self.metadata.fetch_repo_metadata(repo.url, repo.default_branch or "main")
if isinstance(meta, dict):
if isinstance(meta.get("name"), str) and meta.get("name").strip():
repo.name = meta["name"].strip()
if isinstance(meta.get("description"), str) and meta.get("description").strip():
repo.description = meta["description"].strip()
if isinstance(meta.get("category"), str) and meta.get("category").strip():
repo.category = meta["category"].strip()
if isinstance(meta.get("author"), str) and meta.get("author").strip():
repo.meta_author = meta["author"].strip()
if isinstance(meta.get("maintainer"), str) and meta.get("maintainer").strip():
repo.meta_maintainer = meta["maintainer"].strip()
if isinstance(meta.get("source"), str) and meta.get("source").strip():
repo.meta_source = meta["source"].strip()
except Exception as e:
_LOGGER.debug("BCS enrich failed for %s: %s", repo.url, e)
async def add_custom_repo(self, url: str, name: str | None) -> None:
await self.storage.add_custom_repo(url, name)
c = await self.storage.add_custom_repo(url, name)
await self.refresh()
self.signal_updated()
return c
async def remove_custom_repo(self, repo_id: str) -> None:
await self.storage.remove_custom_repo(repo_id)
await self.refresh()
self.signal_updated()
def as_dict(self) -> dict[str, Any]:
# Render repos to JSON-safe output
repos_out = []
for r in self._repos:
repos_out.append(asdict(r))
return {
"ok": True,
"version": self.store.get_version(),
"repos": repos_out,
}
def _normalize_repo_name(self, name: str) -> str:
n = (name or "").strip()
def _normalize_repo_name(self, name: str | None) -> str | None:
if not name:
return None
n = name.strip()
if n.endswith(".git"):
n = n[: -len(".git")]
return n.strip()
n = n[:-4]
return n or None
def _split_owner_repo(self, repo_url: str) -> tuple[str | None, str | None]:
u = urlparse(repo_url.rstrip("/"))
@@ -189,10 +236,16 @@ class BahmcloudStoreCore:
host = urlparse(repo_url).netloc.lower()
return host and "github.com" not in host and "gitlab.com" not in host
def _is_gitlab(self, repo_url: str, provider: str | None = None) -> bool:
host = urlparse(repo_url).netloc.lower()
if provider and provider.strip().lower() == "gitlab":
return True
return "gitlab" in host
async def _fetch_text(self, url: str) -> str | None:
session = async_get_clientsession(self.hass)
try:
async with session.get(url, headers={"User-Agent": "BahmcloudStore (Home Assistant)"}, timeout=20) as resp:
async with session.get(url, timeout=20) as resp:
if resp.status != 200:
return None
return await resp.text()
@@ -200,12 +253,12 @@ class BahmcloudStoreCore:
return None
async def fetch_readme_markdown(self, repo_id: str) -> str | None:
"""Fetch README markdown for a repository (GitHub, Gitea, GitLab).
"""Fetch README markdown from GitHub, Gitea or GitLab.
The implementation is intentionally defensive:
Defensive behavior:
- tries multiple common filenames
- tries multiple branches (default, main, master) when default is unknown
- uses public raw endpoints (no tokens needed for public repos)
- tries multiple branches (default, main, master)
- uses public raw endpoints (no tokens required for public repositories)
"""
repo = self.get_repo(repo_id)
if not repo:
@@ -215,39 +268,20 @@ class BahmcloudStoreCore:
if not repo_url:
return None
provider = (repo.provider or "").strip().lower()
u = urlparse(repo_url.rstrip("/"))
host = (u.netloc or "").lower()
# Determine provider if not present (best-effort, do not override explicit provider)
if not provider:
if "github.com" in host:
provider = "github"
elif "gitlab.com" in host:
provider = "gitlab"
else:
provider = "gitea"
# Branch fallbacks
branch_candidates: list[str] = []
if isinstance(repo.default_branch, str) and repo.default_branch.strip():
branch_candidates.append(repo.default_branch.strip())
if repo.default_branch and str(repo.default_branch).strip():
branch_candidates.append(str(repo.default_branch).strip())
for b in ("main", "master"):
if b not in branch_candidates:
branch_candidates.append(b)
# Filename fallbacks
filenames = [
"README.md",
"readme.md",
"README.MD",
"README.rst",
"README",
]
filenames = ["README.md", "readme.md", "README.MD", "README.rst", "README"]
candidates: list[str] = []
if provider == "github":
if self._is_github(repo_url):
owner, name = self._split_owner_repo(repo_url)
if not owner or not name:
return None
@@ -255,33 +289,41 @@ class BahmcloudStoreCore:
base = f"https://raw.githubusercontent.com/{owner}/{name}/{branch}"
candidates.extend([f"{base}/{fn}" for fn in filenames])
elif provider == "gitea":
owner, name = self._split_owner_repo(repo_url)
if not owner or not name:
elif self._is_gitlab(repo_url, repo.provider):
# GitLab can have nested groups: /group/subgroup/repo
u = urlparse(repo_url.rstrip("/"))
parts = [p for p in u.path.strip("/").split("/") if p]
if len(parts) < 2:
return None
root = f"{u.scheme}://{u.netloc}/{owner}/{name}"
repo_name = self._normalize_repo_name(parts[-1])
group_path = "/".join(parts[:-1]).strip("/")
if not group_path or not repo_name:
return None
root = f"{u.scheme}://{u.netloc}/{group_path}/{repo_name}"
for branch in branch_candidates:
# Most common GitLab raw form
bases = [
f"{root}/raw/branch/{branch}",
f"{root}/-/raw/{branch}",
# Some instances may expose /raw/<branch> as well
f"{root}/raw/{branch}",
]
for b in bases:
candidates.extend([f"{b}/{fn}" for fn in filenames])
elif provider == "gitlab":
# GitLab supports nested groups, so we must keep the full path.
parts = [p for p in u.path.strip("/").split("/") if p]
if len(parts) < 2:
return None
repo_name = self._normalize_repo_name(parts[-1])
owner_path = "/".join(parts[:-1]).strip("/")
if not owner_path or not repo_name:
elif self._is_gitea(repo_url):
owner, name = self._split_owner_repo(repo_url)
if not owner or not name:
return None
root = f"{u.scheme}://{u.netloc}/{owner_path}/{repo_name}"
u = urlparse(repo_url.rstrip("/"))
root = f"{u.scheme}://{u.netloc}/{owner}/{name}"
for branch in branch_candidates:
# gitea raw endpoints (both common forms)
bases = [
f"{root}/-/raw/{branch}",
f"{root}/raw/branch/{branch}",
f"{root}/raw/{branch}",
]
for b in bases:
@@ -290,7 +332,6 @@ class BahmcloudStoreCore:
else:
return None
# Try all candidates in order. First successful text wins.
for url in candidates:
txt = await self._fetch_text(url)
if txt and txt.strip():