custom_components/bahmcloud_store/core.py aktualisiert

This commit is contained in:
2026-01-15 15:04:13 +00:00
parent 3f6da60c0d
commit e863677428

View File

@@ -1,36 +1,20 @@
from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import dataclass
from pathlib import Path
from dataclasses import asdict, dataclass
from datetime import timedelta
from typing import Any
from urllib.parse import urlparse
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.util import yaml as ha_yaml
from homeassistant.helpers.event import async_track_time_interval
from .storage import BCSStorage, CustomRepo
from .views import StaticAssetsView, BCSApiView, BCSReadmeView
from .custom_repo_view import BCSCustomRepoView
from .providers import fetch_repo_info, detect_provider, RepoInfo
from .metadata import fetch_repo_metadata, RepoMetadata
from .providers import fetch_provider_info
_LOGGER = logging.getLogger(__name__)
DOMAIN = "bahmcloud_store"
class BCSError(Exception):
"""BCS core error."""
@dataclass
class BCSConfig:
store_url: str
@dataclass
class RepoItem:
@@ -46,214 +30,148 @@ class RepoItem:
default_branch: str | None = None
latest_version: str | None = None
latest_version_source: str | None = None # "release" | "tag" | None
latest_version_source: str | None = None
description: str | None = None
category: str | None = None
meta_source: str | None = None
meta_name: str | None = None
meta_description: str | None = None
meta_category: str | None = None
meta_author: str | None = None
meta_maintainer: str | None = None
meta_source: str | None = None
class BCSCore:
def __init__(self, hass: HomeAssistant, config: BCSConfig) -> None:
class BahmcloudStoreCore:
def __init__(self, hass: HomeAssistant, store: Any, storage: Any, metadata: Any) -> None:
self.hass = hass
self.config = config
self.storage = BCSStorage(hass)
self.store = store
self.storage = storage
self.metadata = metadata
self.refresh_seconds: int = 300
self.repos: dict[str, RepoItem] = {}
self._listeners: list[callable] = []
self._repos: list[RepoItem] = []
self._repos_by_id: dict[str, RepoItem] = {}
self.version: str = self._read_manifest_version()
self._refresh_lock = asyncio.Lock()
self._refresh_unsub = None
def _read_manifest_version(self) -> str:
try:
manifest_path = Path(__file__).resolve().parent / "manifest.json"
data = json.loads(manifest_path.read_text(encoding="utf-8"))
v = data.get("version")
return str(v) if v else "unknown"
except Exception:
return "unknown"
def start(self) -> None:
# Refresh every ~300 seconds
self._refresh_unsub = async_track_time_interval(
self.hass, self._refresh_task, timedelta(seconds=300)
)
self.hass.async_create_task(self.refresh())
def add_listener(self, cb) -> None:
self._listeners.append(cb)
async def stop(self) -> None:
if self._refresh_unsub:
self._refresh_unsub()
self._refresh_unsub = None
def signal_updated(self) -> None:
for cb in list(self._listeners):
try:
cb()
except Exception:
pass
async def register_http_views(self) -> None:
self.hass.http.register_view(StaticAssetsView())
self.hass.http.register_view(BCSApiView(self))
self.hass.http.register_view(BCSReadmeView(self))
self.hass.http.register_view(BCSCustomRepoView(self))
async def _refresh_task(self, _now: Any) -> None:
await self.refresh()
def get_repo(self, repo_id: str) -> RepoItem | None:
return self.repos.get(repo_id)
return self._repos_by_id.get(repo_id)
async def refresh(self) -> None:
index_repos, refresh_seconds = await self._load_index_repos()
self.refresh_seconds = refresh_seconds
async with self._refresh_lock:
try:
index_repos = await self.store.load_index_repos()
custom_repos = await self.storage.load_custom_repos()
custom_repos = await self.storage.list_custom_repos()
merged: list[RepoItem] = []
by_id: dict[str, RepoItem] = {}
merged: dict[str, RepoItem] = {}
for item in index_repos:
merged[item.id] = item
for c in custom_repos:
merged[c.id] = RepoItem(
id=c.id,
name=(c.name or c.url),
url=c.url,
source="custom",
)
for r in merged.values():
r.provider = detect_provider(r.url)
await self._enrich_and_resolve(merged)
self.repos = merged
async def _enrich_and_resolve(self, merged: dict[str, RepoItem]) -> None:
sem = asyncio.Semaphore(6)
async def process_one(r: RepoItem) -> None:
async with sem:
info: RepoInfo = await fetch_repo_info(self.hass, r.url)
r.provider = info.provider or r.provider
r.owner = info.owner or r.owner
r.provider_repo_name = info.repo_name
r.provider_description = info.description
r.default_branch = info.default_branch or r.default_branch
r.latest_version = info.latest_version
r.latest_version_source = info.latest_version_source
md: RepoMetadata = await fetch_repo_metadata(self.hass, r.url, r.default_branch)
r.meta_source = md.source
r.meta_name = md.name
r.meta_description = md.description
r.meta_category = md.category
r.meta_author = md.author
r.meta_maintainer = md.maintainer
has_user_or_index_name = bool(r.name) and (r.name != r.url) and (not str(r.name).startswith("http"))
if r.meta_name:
r.name = r.meta_name
elif not has_user_or_index_name and r.provider_repo_name:
r.name = r.provider_repo_name
elif not r.name:
r.name = r.url
await asyncio.gather(*(process_one(r) for r in merged.values()), return_exceptions=True)
async def _load_index_repos(self) -> tuple[list[RepoItem], int]:
session = async_get_clientsession(self.hass)
try:
async with session.get(self.config.store_url, timeout=20) as resp:
if resp.status != 200:
raise BCSError(f"store_url returned {resp.status}")
raw = await resp.text()
except Exception as e:
raise BCSError(f"Failed fetching store index: {e}") from e
try:
data = ha_yaml.parse_yaml(raw)
if not isinstance(data, dict):
raise BCSError("store.yaml must be a mapping")
refresh_seconds = int(data.get("refresh_seconds", 300))
repos = data.get("repos", [])
if not isinstance(repos, list):
raise BCSError("store.yaml 'repos' must be a list")
items: list[RepoItem] = []
for i, r in enumerate(repos):
if not isinstance(r, dict):
continue
url = str(r.get("url", "")).strip()
if not url:
continue
name = str(r.get("name") or url).strip()
items.append(
RepoItem(
id=f"index:{i}",
name=name,
url=url,
# index repos
for r in index_repos:
item = RepoItem(
id=str(r.get("id")),
name=str(r.get("name") or "Unnamed repository"),
url=str(r.get("url") or ""),
source="index",
description=r.get("description") if isinstance(r.get("description"), str) else None,
category=r.get("category") if isinstance(r.get("category"), str) else None,
)
)
merged.append(item)
by_id[item.id] = item
return items, refresh_seconds
except Exception as e:
raise BCSError(f"Invalid store.yaml: {e}") from e
# custom repos
for r in custom_repos:
item = RepoItem(
id=str(r.get("id")),
name=str(r.get("name") or "Custom repository"),
url=str(r.get("url") or ""),
source="custom",
)
merged.append(item)
by_id[item.id] = item
async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo:
repo = await self.storage.add_custom_repo(url=url, name=name)
self._repos = merged
self._repos_by_id = by_id
# Enrich from providers + metadata
await self._enrich_repos()
except Exception as e:
_LOGGER.exception("BCS refresh failed: %s", e)
async def _enrich_repos(self) -> None:
# Provider + metadata enrichment in sequence (safe + avoids rate spikes)
for repo in list(self._repos):
try:
info = await fetch_provider_info(self.hass, repo.url)
repo.owner = info.owner
repo.provider = info.provider
repo.provider_repo_name = info.repo_name
repo.provider_description = info.description
repo.default_branch = info.default_branch
repo.latest_version = info.latest_version
repo.latest_version_source = info.latest_version_source
# Metadata (optional)
meta = await self.metadata.fetch_repo_metadata(repo.url, repo.default_branch or "main")
if isinstance(meta, dict):
if isinstance(meta.get("name"), str) and meta.get("name").strip():
repo.name = meta["name"].strip()
if isinstance(meta.get("description"), str) and meta.get("description").strip():
repo.description = meta["description"].strip()
if isinstance(meta.get("category"), str) and meta.get("category").strip():
repo.category = meta["category"].strip()
if isinstance(meta.get("author"), str) and meta.get("author").strip():
repo.meta_author = meta["author"].strip()
if isinstance(meta.get("maintainer"), str) and meta.get("maintainer").strip():
repo.meta_maintainer = meta["maintainer"].strip()
if isinstance(meta.get("source"), str) and meta.get("source").strip():
repo.meta_source = meta["source"].strip()
except Exception as e:
_LOGGER.debug("BCS enrich failed for %s: %s", repo.url, e)
async def add_custom_repo(self, url: str, name: str | None) -> None:
await self.storage.add_custom_repo(url, name)
await self.refresh()
self.signal_updated()
return repo
async def remove_custom_repo(self, repo_id: str) -> None:
await self.storage.remove_custom_repo(repo_id)
await self.refresh()
self.signal_updated()
async def list_custom_repos(self) -> list[CustomRepo]:
return await self.storage.list_custom_repos()
def as_dict(self) -> dict[str, Any]:
# Render repos to JSON-safe output
repos_out = []
for r in self._repos:
repos_out.append(asdict(r))
def list_repos_public(self) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for r in self.repos.values():
resolved_description = r.meta_description or r.provider_description
out.append(
{
"id": r.id,
"name": r.name,
"url": r.url,
"source": r.source,
"owner": r.owner,
"provider": r.provider,
return {
"ok": True,
"version": self.store.get_version(),
"repos": repos_out,
}
"meta_source": r.meta_source,
"meta_name": r.meta_name,
"meta_description": r.meta_description,
"meta_category": r.meta_category,
"meta_author": r.meta_author,
"meta_maintainer": r.meta_maintainer,
"provider_repo_name": r.provider_repo_name,
"provider_description": r.provider_description,
"description": resolved_description,
"category": r.meta_category,
"latest_version": r.latest_version,
"latest_version_source": r.latest_version_source,
}
)
return out
# ----------------------------
# README fetching
# ----------------------------
def _normalize_repo_name(self, name: str | None) -> str | None:
if not name:
return None
n = name.strip()
def _normalize_repo_name(self, name: str) -> str:
n = (name or "").strip()
if n.endswith(".git"):
n = n[:-4]
return n or None
n = n[: -len(".git")]
return n.strip()
def _split_owner_repo(self, repo_url: str) -> tuple[str | None, str | None]:
u = urlparse(repo_url.rstrip("/"))
@@ -274,7 +192,7 @@ class BCSCore:
async def _fetch_text(self, url: str) -> str | None:
session = async_get_clientsession(self.hass)
try:
async with session.get(url, timeout=20) as resp:
async with session.get(url, headers={"User-Agent": "BahmcloudStore (Home Assistant)"}, timeout=20) as resp:
if resp.status != 200:
return None
return await resp.text()
@@ -282,42 +200,100 @@ class BCSCore:
return None
async def fetch_readme_markdown(self, repo_id: str) -> str | None:
"""Fetch README markdown for a repository (GitHub, Gitea, GitLab).
The implementation is intentionally defensive:
- tries multiple common filenames
- tries multiple branches (default, main, master) when default is unknown
- uses public raw endpoints (no tokens needed for public repos)
"""
repo = self.get_repo(repo_id)
if not repo:
return None
owner, name = self._split_owner_repo(repo.url)
if not owner or not name:
repo_url = (repo.url or "").strip()
if not repo_url:
return None
branch = repo.default_branch or "main"
filenames = ["README.md", "readme.md", "README.MD"]
provider = (repo.provider or "").strip().lower()
u = urlparse(repo_url.rstrip("/"))
host = (u.netloc or "").lower()
# Determine provider if not present (best-effort, do not override explicit provider)
if not provider:
if "github.com" in host:
provider = "github"
elif "gitlab.com" in host:
provider = "gitlab"
else:
provider = "gitea"
# Branch fallbacks
branch_candidates: list[str] = []
if isinstance(repo.default_branch, str) and repo.default_branch.strip():
branch_candidates.append(repo.default_branch.strip())
for b in ("main", "master"):
if b not in branch_candidates:
branch_candidates.append(b)
# Filename fallbacks
filenames = [
"README.md",
"readme.md",
"README.MD",
"README.rst",
"README",
]
candidates: list[str] = []
if self._is_github(repo.url):
# raw github content
base = f"https://raw.githubusercontent.com/{owner}/{name}/{branch}"
candidates.extend([f"{base}/{fn}" for fn in filenames])
if provider == "github":
owner, name = self._split_owner_repo(repo_url)
if not owner or not name:
return None
for branch in branch_candidates:
base = f"https://raw.githubusercontent.com/{owner}/{name}/{branch}"
candidates.extend([f"{base}/{fn}" for fn in filenames])
elif self._is_gitea(repo.url):
u = urlparse(repo.url.rstrip("/"))
elif provider == "gitea":
owner, name = self._split_owner_repo(repo_url)
if not owner or not name:
return None
root = f"{u.scheme}://{u.netloc}/{owner}/{name}"
for branch in branch_candidates:
bases = [
f"{root}/raw/branch/{branch}",
f"{root}/raw/{branch}",
]
for b in bases:
candidates.extend([f"{b}/{fn}" for fn in filenames])
# gitea raw endpoints (both common forms)
bases = [
f"{root}/raw/branch/{branch}",
f"{root}/raw/{branch}",
]
for b in bases:
candidates.extend([f"{b}/{fn}" for fn in filenames])
elif provider == "gitlab":
# GitLab supports nested groups, so we must keep the full path.
parts = [p for p in u.path.strip("/").split("/") if p]
if len(parts) < 2:
return None
repo_name = self._normalize_repo_name(parts[-1])
owner_path = "/".join(parts[:-1]).strip("/")
if not owner_path or not repo_name:
return None
root = f"{u.scheme}://{u.netloc}/{owner_path}/{repo_name}"
for branch in branch_candidates:
bases = [
f"{root}/-/raw/{branch}",
f"{root}/raw/{branch}",
]
for b in bases:
candidates.extend([f"{b}/{fn}" for fn in filenames])
else:
return None
# Try all candidates in order. First successful text wins.
for url in candidates:
txt = await self._fetch_text(url)
if txt:
if txt and txt.strip():
return txt
return None