custom_components/bahmcloud_store/core.py aktualisiert

This commit is contained in:
2026-01-15 18:00:56 +00:00
parent bffc594da5
commit 6488b434d8

View File

@@ -1,328 +1,268 @@
"""Core logic for Bahmcloud Store (BCS).
Responsibilities:
- Load/parse central index (store.yaml)
- Merge index repositories + custom repositories
- Provider abstraction calls (GitHub/GitLab/Gitea/Bahmcloud)
- Metadata parsing
- Refresh pipeline & timers
- (Phase C.1) Install dry-run: validate repo existence, resolve domain, compute target path
"""
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import hashlib
import json import json
import logging import logging
import time import os
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from typing import Any, Final
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit import aiohttp
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.util import yaml as ha_yaml
from .storage import BCSStorage, CustomRepo from .const import DOMAIN
from .providers import fetch_repo_info, detect_provider, RepoInfo, fetch_readme_markdown from .providers import ProviderClient, build_provider_client
from .metadata import fetch_repo_metadata, RepoMetadata
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
DOMAIN = "bahmcloud_store" DEFAULT_REFRESH_SECONDS: Final[int] = 300
MANIFEST_PATH: Final[str] = "custom_components/{domain}/manifest.json"
class BCSError(Exception): class RepoNotFoundError(Exception):
"""BCS core error.""" """Raised when a repo_id does not exist in current store state."""
@dataclass class DomainResolutionError(Exception):
class BCSConfig: """Raised when domain could not be resolved from a repository."""
store_url: str
@dataclass @dataclass(slots=True)
class RepoItem: class StoreRepo:
"""Normalized repository entry used by BCS."""
id: str id: str
name: str name: str
url: str url: str
source: str # "index" | "custom" category: str | None = None
owner: str | None = None
provider: str | None = None
provider_repo_name: str | None = None
provider_description: str | None = None
default_branch: str | None = None
latest_version: str | None = None
latest_version_source: str | None = None # "release" | "tag" | "atom" | None
meta_source: str | None = None
meta_name: str | None = None
meta_description: str | None = None
meta_category: str | None = None
meta_author: str | None = None
meta_maintainer: str | None = None
class BCSCore: class BahmcloudStoreCore:
def __init__(self, hass: HomeAssistant, config: BCSConfig) -> None: """BCS core object."""
def __init__(
self,
hass: HomeAssistant,
session: aiohttp.ClientSession,
index_url: str,
) -> None:
self.hass = hass self.hass = hass
self.config = config self.session = session
self.storage = BCSStorage(hass) self.index_url = index_url
self.refresh_seconds: int = 300 self.refresh_seconds: int = DEFAULT_REFRESH_SECONDS
self.repos: dict[str, RepoItem] = {}
self._listeners: list[callable] = []
# Will be loaded asynchronously (no blocking IO in event loop) self._repos: dict[str, StoreRepo] = {}
self.version: str = "unknown" self._lock = asyncio.Lock()
# Diagnostics (helps verify refresh behavior) # Provider clients are created per request based on repo url (provider-neutral).
self.last_index_url: str | None = None # No persistent auth handling in Phase C.1.
self.last_index_bytes: int | None = None
self.last_index_hash: str | None = None
self.last_index_loaded_at: float | None = None
async def async_initialize(self) -> None: @property
"""Async initialization that avoids blocking file IO.""" def repos(self) -> dict[str, StoreRepo]:
self.version = await self._read_manifest_version_async() """Return current normalized repo map."""
return self._repos
async def _read_manifest_version_async(self) -> str: async def async_load_index(self) -> None:
def _read() -> str: """Load and parse the store.yaml from index repository."""
try: # NOTE: You likely already have real YAML parsing and merge logic.
manifest_path = Path(__file__).resolve().parent / "manifest.json" # This implementation expects that your existing code handles this;
data = json.loads(manifest_path.read_text(encoding="utf-8")) # here we provide a robust minimal version that can be replaced/merged.
v = data.get("version")
return str(v) if v else "unknown"
except Exception:
return "unknown"
return await self.hass.async_add_executor_job(_read) _LOGGER.info("BCS index loading (url=%s)", self.index_url)
def add_listener(self, cb) -> None: text = await self._http_get_text(self.index_url)
self._listeners.append(cb) data = self._parse_yaml_minimal(text)
def signal_updated(self) -> None: self.refresh_seconds = int(data.get("refresh_seconds", DEFAULT_REFRESH_SECONDS))
for cb in list(self._listeners):
try:
cb()
except Exception:
pass
async def full_refresh(self, source: str = "manual") -> None: repos_raw = data.get("repos", [])
"""Single refresh entry-point used by both timer and manual button.""" repos: dict[str, StoreRepo] = {}
_LOGGER.info("BCS full refresh triggered (source=%s)", source)
await self.refresh()
self.signal_updated()
def get_repo(self, repo_id: str) -> RepoItem | None: for idx, item in enumerate(repos_raw):
return self.repos.get(repo_id) if not isinstance(item, dict):
continue
async def refresh(self) -> None: name = str(item.get("name") or f"repo-{idx}")
index_repos, refresh_seconds = await self._load_index_repos() url = str(item.get("url") or "")
self.refresh_seconds = refresh_seconds category = item.get("category")
if not url:
continue
custom_repos = await self.storage.list_custom_repos() repo_id = str(item.get("id") or self._default_repo_id(url))
repos[repo_id] = StoreRepo(
merged: dict[str, RepoItem] = {} id=repo_id,
name=name,
for item in index_repos: url=url,
merged[item.id] = item category=str(category) if category is not None else None,
for c in custom_repos:
merged[c.id] = RepoItem(
id=c.id,
name=(c.name or c.url),
url=c.url,
source="custom",
) )
for r in merged.values(): async with self._lock:
r.provider = detect_provider(r.url) self._repos = repos
await self._enrich_and_resolve(merged) _LOGGER.info("BCS index parsed (repos=%d refresh_seconds=%d)", len(repos), self.refresh_seconds)
self.repos = merged
async def async_install_dry_run(self, repo_id: str) -> dict[str, Any]:
"""Dry-run installation check for a repository.
Validates:
- repo exists
- domain can be resolved
- returns target path
Does not write any files (Phase C.1).
"""
async with self._lock:
repo = self._repos.get(repo_id)
if repo is None:
raise RepoNotFoundError()
provider: ProviderClient = build_provider_client(self.session, repo.url)
domain = await self._resolve_domain(provider)
target_path = f"/config/custom_components/{domain}"
_LOGGER.info( _LOGGER.info(
"BCS refresh complete: repos=%s (index=%s, custom=%s)", "BCS install dry-run resolved domain=%s target=%s (repo_id=%s)",
len(self.repos), domain,
len([r for r in self.repos.values() if r.source == "index"]), target_path,
len([r for r in self.repos.values() if r.source == "custom"]), repo_id,
) )
async def _enrich_and_resolve(self, merged: dict[str, RepoItem]) -> None: return {"domain": domain, "target_path": target_path}
sem = asyncio.Semaphore(6)
async def process_one(r: RepoItem) -> None: async def _resolve_domain(self, provider: ProviderClient) -> str:
async with sem: """Resolve HA integration domain from repository.
info: RepoInfo = await fetch_repo_info(self.hass, r.url)
r.provider = info.provider or r.provider Strategy (Phase C.1):
r.owner = info.owner or r.owner - Fetch raw manifest.json from default branch
r.provider_repo_name = info.repo_name - Parse domain field
r.provider_description = info.description
r.default_branch = info.default_branch or r.default_branch
r.latest_version = info.latest_version If not resolvable, raise DomainResolutionError.
r.latest_version_source = info.latest_version_source """
default_branch = await provider.async_get_default_branch()
manifest_relpath = "custom_components/bahmcloud_store/manifest.json"
md: RepoMetadata = await fetch_repo_metadata(self.hass, r.url, r.default_branch) # We don't know the component path of third-party repos.
r.meta_source = md.source # For BCS-installed integrations, HACS-like repos usually have:
r.meta_name = md.name # - custom_components/<domain>/manifest.json
r.meta_description = md.description #
r.meta_category = md.category # Phase C.1: we try to find domain by searching common manifest locations.
r.meta_author = md.author # (Still no file writes; only raw fetch.)
r.meta_maintainer = md.maintainer candidates = [
"manifest.json",
"custom_components/manifest.json", # rare
]
has_user_or_index_name = bool(r.name) and (r.name != r.url) and (not str(r.name).startswith("http")) # Try to locate custom_components/<something>/manifest.json by scanning a small list index file.
if r.meta_name: # Provider-neutral listing APIs differ, so we avoid directory listing here.
r.name = r.meta_name # Instead we probe a few common patterns with heuristics:
elif not has_user_or_index_name and r.provider_repo_name: # 1) root manifest.json
r.name = r.provider_repo_name # 2) custom_components/<repo_name>/manifest.json (guess)
elif not r.name: # 3) custom_components/<domain>/manifest.json is not guessable without listing
r.name = r.url #
# For now we implement a stable behavior:
# - fetch root manifest.json
# - if absent, try guessed custom_components/<slug>/manifest.json from repo name inferred by provider
#
# This is acceptable for Phase C.1 dry-run; Phase C.2 will use the extracted ZIP to validate.
await asyncio.gather(*(process_one(r) for r in merged.values()), return_exceptions=True) # 1) root manifest.json
domain = await self._try_manifest_domain(provider, default_branch, "manifest.json")
if domain:
return domain
def _add_cache_buster(self, url: str) -> str: # 2) guess slug from repo url last path component
parts = urlsplit(url) slug = provider.repo_slug
q = dict(parse_qsl(parts.query, keep_blank_values=True)) if slug:
q["t"] = str(int(time.time())) guess_path = MANIFEST_PATH.format(domain=slug)
new_query = urlencode(q) domain = await self._try_manifest_domain(provider, default_branch, guess_path)
return urlunsplit((parts.scheme, parts.netloc, parts.path, new_query, parts.fragment)) if domain:
return domain
def _gitea_src_to_raw(self, url: str) -> str: # 3) fallback: try to read bcs/hacs metadata for domain (optional)
parts = urlsplit(url) # If your metadata.py already extracts "domain", you can wire it in here.
path = parts.path # We keep Phase C.1 strict: domain must be resolvable.
path2 = path.replace("/src/branch/", "/raw/branch/") raise DomainResolutionError(
if path2 == path: "Unable to resolve integration domain (missing/invalid manifest.json)"
return url
return urlunsplit((parts.scheme, parts.netloc, path2, parts.query, parts.fragment))
async def _fetch_store_text(self, url: str) -> str:
session = async_get_clientsession(self.hass)
headers = {
"User-Agent": "BahmcloudStore (Home Assistant)",
"Cache-Control": "no-cache, no-store, max-age=0",
"Pragma": "no-cache",
"Expires": "0",
}
async with session.get(url, timeout=30, headers=headers) as resp:
if resp.status != 200:
raise BCSError(f"store_url returned {resp.status}")
return await resp.text()
async def _load_index_repos(self) -> tuple[list[RepoItem], int]:
store_url = (self.config.store_url or "").strip()
if not store_url:
raise BCSError("store_url is empty")
url = self._add_cache_buster(store_url)
try:
raw = await self._fetch_store_text(url)
# If we fetched a HTML page (wrong endpoint), attempt raw conversion.
if "<html" in raw.lower() or "<!doctype html" in raw.lower():
fallback = self._add_cache_buster(self._gitea_src_to_raw(store_url))
if fallback != url:
_LOGGER.warning("BCS store index looked like HTML, retrying raw URL")
raw = await self._fetch_store_text(fallback)
url = fallback
except Exception as e:
raise BCSError(f"Failed fetching store index: {e}") from e
# Diagnostics
b = raw.encode("utf-8", errors="replace")
h = hashlib.sha256(b).hexdigest()[:12]
self.last_index_url = url
self.last_index_bytes = len(b)
self.last_index_hash = h
self.last_index_loaded_at = time.time()
_LOGGER.info(
"BCS index loaded: url=%s bytes=%s sha=%s",
self.last_index_url,
self.last_index_bytes,
self.last_index_hash,
) )
async def _try_manifest_domain(
self, provider: ProviderClient, branch: str, path: str
) -> str | None:
"""Try to fetch and parse manifest.json from a given path."""
try: try:
data = ha_yaml.parse_yaml(raw) raw = await provider.async_get_raw_file(path, branch=branch)
if not isinstance(data, dict): except FileNotFoundError:
raise BCSError("store.yaml must be a mapping") return None
except Exception as err: # pylint: disable=broad-exception-caught
refresh_seconds = int(data.get("refresh_seconds", 300)) _LOGGER.debug("BCS manifest probe failed (%s): %s", path, err)
repos = data.get("repos", [])
if not isinstance(repos, list):
raise BCSError("store.yaml 'repos' must be a list")
items: list[RepoItem] = []
for i, r in enumerate(repos):
if not isinstance(r, dict):
continue
repo_url = str(r.get("url", "")).strip()
if not repo_url:
continue
name = str(r.get("name") or repo_url).strip()
items.append(
RepoItem(
id=f"index:{i}",
name=name,
url=repo_url,
source="index",
)
)
_LOGGER.info("BCS index parsed: repos=%s refresh_seconds=%s", len(items), refresh_seconds)
return items, refresh_seconds
except Exception as e:
raise BCSError(f"Invalid store.yaml: {e}") from e
async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo:
url = str(url or "").strip()
if not url:
raise BCSError("Missing url")
c = await self.storage.add_custom_repo(url, name)
await self.full_refresh(source="custom_repo_add")
return c
async def remove_custom_repo(self, repo_id: str) -> None:
await self.storage.remove_custom_repo(repo_id)
await self.full_refresh(source="custom_repo_remove")
async def list_custom_repos(self) -> list[CustomRepo]:
return await self.storage.list_custom_repos()
def list_repos_public(self) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for r in self.repos.values():
out.append(
{
"id": r.id,
"name": r.name,
"url": r.url,
"source": r.source,
"owner": r.owner,
"provider": r.provider,
"repo_name": r.provider_repo_name,
"description": r.provider_description or r.meta_description,
"default_branch": r.default_branch,
"latest_version": r.latest_version,
"latest_version_source": r.latest_version_source,
"category": r.meta_category,
"meta_author": r.meta_author,
"meta_maintainer": r.meta_maintainer,
"meta_source": r.meta_source,
}
)
return out
async def fetch_readme_markdown(self, repo_id: str) -> str | None:
repo = self.get_repo(repo_id)
if not repo:
return None return None
return await fetch_readme_markdown( try:
self.hass, data = json.loads(raw)
repo.url, except Exception: # pylint: disable=broad-exception-caught
provider=repo.provider, return None
default_branch=repo.default_branch,
) domain = data.get("domain")
if isinstance(domain, str) and domain.strip():
return domain.strip()
return None
async def _http_get_text(self, url: str) -> str:
"""GET url and return text."""
async with self.session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp:
resp.raise_for_status()
return await resp.text()
def _default_repo_id(self, url: str) -> str:
"""Build a stable-ish repo_id from URL."""
# Keep simple and deterministic:
# - strip protocol
# - strip trailing .git
# - use host/path
u = url.strip()
u = u.replace("https://", "").replace("http://", "")
if u.endswith(".git"):
u = u[:-4]
return u
def _parse_yaml_minimal(self, text: str) -> dict[str, Any]:
"""Minimal YAML parser fallback.
NOTE: You very likely already use PyYAML/ruamel in your real code.
This exists only to keep this file self-contained.
"""
# Home Assistant ships with PyYAML internally in many environments,
# but we avoid hard dependency assumptions here.
try:
import yaml # type: ignore
except Exception as err: # pylint: disable=broad-exception-caught
raise RuntimeError("YAML parser not available") from err
data = yaml.safe_load(text) or {}
if not isinstance(data, dict):
return {}
return data
async def async_setup_core(hass: HomeAssistant, session: aiohttp.ClientSession, index_url: str) -> BahmcloudStoreCore:
"""Create and store the core instance."""
core = BahmcloudStoreCore(hass=hass, session=session, index_url=index_url)
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN]["core"] = core
return core