From c04612e159254479d74bdb55f34edc278f91035c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Bachmann?= Date: Fri, 16 Jan 2026 19:10:35 +0000 Subject: [PATCH] 0.5.3 --- custom_components/bahmcloud_store/core.py | 685 +++++----------------- 1 file changed, 134 insertions(+), 551 deletions(-) diff --git a/custom_components/bahmcloud_store/core.py b/custom_components/bahmcloud_store/core.py index 877cf6c..efbff00 100644 --- a/custom_components/bahmcloud_store/core.py +++ b/custom_components/bahmcloud_store/core.py @@ -1,583 +1,166 @@ from __future__ import annotations -import asyncio -import hashlib -import json import logging -import time -import shutil -import tempfile -import zipfile from dataclasses import dataclass -from pathlib import Path from typing import Any -from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit, urlparse -from homeassistant.core import HomeAssistant -from homeassistant.helpers.aiohttp_client import async_get_clientsession -from homeassistant.components import persistent_notification -from homeassistant.util import yaml as ha_yaml +from homeassistant.components.update import UpdateEntity, UpdateEntityFeature +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers.dispatcher import async_dispatcher_connect +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.entity import EntityCategory -from .storage import BCSStorage, CustomRepo -from .providers import fetch_repo_info, detect_provider, RepoInfo, fetch_readme_markdown -from .metadata import fetch_repo_metadata, RepoMetadata +from .core import DOMAIN, SIGNAL_UPDATED, BCSCore _LOGGER = logging.getLogger(__name__) -DOMAIN = "bahmcloud_store" - -class BCSError(Exception): - """BCS core error.""" - - -class BCSInstallError(BCSError): - """BCS installation/update error.""" - - -@dataclass -class BCSConfig: - store_url: str - - -@dataclass -class RepoItem: - id: str - name: str - url: str - source: str # "index" | "custom" - - owner: str | None = None - provider: str | None = None - provider_repo_name: str | None = None - provider_description: str | None = None - default_branch: str | None = None - - latest_version: str | None = None - latest_version_source: str | None = None # "release" | "tag" | "atom" | None - - meta_source: str | None = None - meta_name: str | None = None - meta_description: str | None = None - meta_category: str | None = None - meta_author: str | None = None - meta_maintainer: str | None = None - - -class BCSCore: - def __init__(self, hass: HomeAssistant, config: BCSConfig) -> None: - self.hass = hass - self.config = config - self.storage = BCSStorage(hass) - - self.refresh_seconds: int = 300 - self.repos: dict[str, RepoItem] = {} - self._listeners: list[callable] = [] - - # Will be loaded asynchronously (no blocking IO in event loop) - self.version: str = "unknown" - - # Diagnostics (helps verify refresh behavior) - self.last_index_url: str | None = None - self.last_index_bytes: int | None = None - self.last_index_hash: str | None = None - self.last_index_loaded_at: float | None = None - - self._install_lock = asyncio.Lock() - self._installed_cache: dict[str, Any] = {} - - async def async_initialize(self) -> None: - """Async initialization that avoids blocking file IO.""" - self.version = await self._read_manifest_version_async() - await self._refresh_installed_cache() - - async def _read_manifest_version_async(self) -> str: - def _read() -> str: - try: - manifest_path = Path(__file__).resolve().parent / "manifest.json" - data = json.loads(manifest_path.read_text(encoding="utf-8")) - v = data.get("version") - return str(v) if v else "unknown" - except Exception: - return "unknown" - - return await self.hass.async_add_executor_job(_read) - - def add_listener(self, cb) -> None: - self._listeners.append(cb) - - def signal_updated(self) -> None: - for cb in list(self._listeners): - try: - cb() - except Exception: - pass - - async def full_refresh(self, source: str = "manual") -> None: - """Single refresh entry-point used by both timer and manual button.""" - _LOGGER.info("BCS full refresh triggered (source=%s)", source) - await self.refresh() - self.signal_updated() - - def get_repo(self, repo_id: str) -> RepoItem | None: - return self.repos.get(repo_id) - - async def refresh(self) -> None: - index_repos, refresh_seconds = await self._load_index_repos() - self.refresh_seconds = refresh_seconds - - custom_repos = await self.storage.list_custom_repos() - - merged: dict[str, RepoItem] = {} - - for item in index_repos: - merged[item.id] = item - - for c in custom_repos: - merged[c.id] = RepoItem( - id=c.id, - name=(c.name or c.url), - url=c.url, - source="custom", - ) - - for r in merged.values(): - r.provider = detect_provider(r.url) - - await self._enrich_and_resolve(merged) - self.repos = merged - - _LOGGER.info( - "BCS refresh complete: repos=%s (index=%s, custom=%s)", - len(self.repos), - len([r for r in self.repos.values() if r.source == "index"]), - len([r for r in self.repos.values() if r.source == "custom"]), - ) - - async def _enrich_and_resolve(self, merged: dict[str, RepoItem]) -> None: - sem = asyncio.Semaphore(6) - - async def process_one(r: RepoItem) -> None: - async with sem: - info: RepoInfo = await fetch_repo_info(self.hass, r.url) - - r.provider = info.provider or r.provider - r.owner = info.owner or r.owner - r.provider_repo_name = info.repo_name - r.provider_description = info.description - r.default_branch = info.default_branch or r.default_branch - - r.latest_version = info.latest_version - r.latest_version_source = info.latest_version_source - - md: RepoMetadata = await fetch_repo_metadata(self.hass, r.url, r.default_branch) - r.meta_source = md.source - r.meta_name = md.name - r.meta_description = md.description - r.meta_category = md.category - r.meta_author = md.author - r.meta_maintainer = md.maintainer - - has_user_or_index_name = bool(r.name) and (r.name != r.url) and (not str(r.name).startswith("http")) - if r.meta_name: - r.name = r.meta_name - elif not has_user_or_index_name and r.provider_repo_name: - r.name = r.provider_repo_name - elif not r.name: - r.name = r.url - - await asyncio.gather(*(process_one(r) for r in merged.values()), return_exceptions=True) - - def _add_cache_buster(self, url: str) -> str: - parts = urlsplit(url) - q = dict(parse_qsl(parts.query, keep_blank_values=True)) - q["t"] = str(int(time.time())) - new_query = urlencode(q) - return urlunsplit((parts.scheme, parts.netloc, parts.path, new_query, parts.fragment)) - - def _gitea_src_to_raw(self, url: str) -> str: - parts = urlsplit(url) - path = parts.path - path2 = path.replace("/src/branch/", "/raw/branch/") - if path2 == path: - return url - return urlunsplit((parts.scheme, parts.netloc, path2, parts.query, parts.fragment)) - - async def _fetch_store_text(self, url: str) -> str: - session = async_get_clientsession(self.hass) - - headers = { - "User-Agent": "BahmcloudStore (Home Assistant)", - "Cache-Control": "no-cache, no-store, max-age=0", - "Pragma": "no-cache", - "Expires": "0", - } - - async with session.get(url, timeout=30, headers=headers) as resp: - if resp.status != 200: - raise BCSError(f"store_url returned {resp.status}") - return await resp.text() - - async def _load_index_repos(self) -> tuple[list[RepoItem], int]: - store_url = (self.config.store_url or "").strip() - if not store_url: - raise BCSError("store_url is empty") - - url = self._add_cache_buster(store_url) - - try: - raw = await self._fetch_store_text(url) - - # If we fetched a HTML page (wrong endpoint), attempt raw conversion. - if " CustomRepo: - url = str(url or "").strip() - if not url: - raise BCSError("Missing url") - - c = await self.storage.add_custom_repo(url, name) - await self.full_refresh(source="custom_repo_add") - return c - - async def remove_custom_repo(self, repo_id: str) -> None: - await self.storage.remove_custom_repo(repo_id) - await self.full_refresh(source="custom_repo_remove") - - async def list_custom_repos(self) -> list[CustomRepo]: - return await self.storage.list_custom_repos() - - def list_repos_public(self) -> list[dict[str, Any]]: - out: list[dict[str, Any]] = [] - - installed_map: dict[str, Any] = getattr(self, "_installed_cache", {}) or {} - if not isinstance(installed_map, dict): - installed_map = {} - - for r in self.repos.values(): - inst = installed_map.get(r.id) - installed = bool(inst) - installed_domains: list[str] = [] - installed_version: str | None = None - installed_manifest_version: str | None = None - - if isinstance(inst, dict): - d = inst.get("domains") or [] - if isinstance(d, list): - installed_domains = [str(x) for x in d if str(x).strip()] - - # IMPORTANT: this is the ref we installed (tag/release/branch) - v = inst.get("installed_version") - installed_version = str(v) if v is not None else None - - mv = inst.get("installed_manifest_version") - installed_manifest_version = str(mv) if mv is not None else None - - out.append( - { - "id": r.id, - "name": r.name, - "url": r.url, - "source": r.source, - "owner": r.owner, - "provider": r.provider, - "repo_name": r.provider_repo_name, - "description": r.provider_description or r.meta_description, - "default_branch": r.default_branch, - "latest_version": r.latest_version, - "latest_version_source": r.latest_version_source, - "category": r.meta_category, - "meta_author": r.meta_author, - "meta_maintainer": r.meta_maintainer, - "meta_source": r.meta_source, - "installed": installed, - "installed_version": installed_version, - "installed_manifest_version": installed_manifest_version, - "installed_domains": installed_domains, - } - ) - return out - - async def fetch_readme_markdown(self, repo_id: str) -> str | None: - repo = self.get_repo(repo_id) +def _pretty_repo_name(core: BCSCore, repo_id: str) -> str: + """Return a human-friendly name for a repo update entity.""" + try: + repo = core.get_repo(repo_id) + if repo and getattr(repo, "name", None): + name = str(repo.name).strip() + if name: + return name + except Exception: + pass + + # Fallbacks + if repo_id.startswith("index:"): + return f"BCS Index {repo_id.split(':', 1)[1]}" + if repo_id.startswith("custom:"): + return f"BCS Custom {repo_id.split(':', 1)[1]}" + return f"BCS {repo_id}" + + +@dataclass(frozen=True) +class _RepoKey: + repo_id: str + + +class BCSRepoUpdateEntity(UpdateEntity): + """Update entity representing a BCS-managed repository.""" + + _attr_entity_category = EntityCategory.DIAGNOSTIC + _attr_supported_features = UpdateEntityFeature.INSTALL + + def __init__(self, core: BCSCore, repo_id: str) -> None: + self._core = core + self._repo_id = repo_id + self._in_progress = False + + # Stable unique id (do NOT change) + self._attr_unique_id = f"{DOMAIN}:{repo_id}" + + # Human-friendly name in UI + pretty = _pretty_repo_name(core, repo_id) + self._attr_name = pretty + + # Title shown in the entity dialog + self._attr_title = pretty + + @property + def available(self) -> bool: + repo = self._core.get_repo(self._repo_id) + installed = self._core.get_installed(self._repo_id) + return repo is not None and installed is not None + + @property + def in_progress(self) -> bool | None: + return self._in_progress + + @property + def installed_version(self) -> str | None: + installed = self._core.get_installed(self._repo_id) or {} + v = installed.get("installed_version") or installed.get("ref") + return str(v) if v else None + + @property + def latest_version(self) -> str | None: + repo = self._core.get_repo(self._repo_id) if not repo: return None + v = getattr(repo, "latest_version", None) + return str(v) if v else None - return await fetch_readme_markdown( - self.hass, - repo.url, - provider=repo.provider, - default_branch=repo.default_branch, - ) + @property + def update_available(self) -> bool: + latest = self.latest_version + installed = self.installed_version + if not latest or not installed: + return False + return latest != installed - def _pick_ref_for_install(self, repo: RepoItem) -> str: - # Prefer latest_version (release/tag/atom-derived), fallback to default branch, then main. - if repo.latest_version and str(repo.latest_version).strip(): - return str(repo.latest_version).strip() - if repo.default_branch and str(repo.default_branch).strip(): - return str(repo.default_branch).strip() - return "main" + def version_is_newer(self, latest_version: str, installed_version: str) -> bool: + return latest_version != installed_version - def _build_zip_url(self, repo_url: str, ref: str) -> str: - """Build a public ZIP download URL (provider-neutral, no tokens). + @property + def release_url(self) -> str | None: + repo = self._core.get_repo(self._repo_id) + return getattr(repo, "url", None) if repo else None - Supports: - - GitHub: codeload - - GitLab: /-/archive/ - - Gitea (incl. Bahmcloud): /archive/.zip - """ - ref = (ref or "").strip() - if not ref: - raise BCSInstallError("Missing ref for ZIP download") + async def async_install(self, version: str | None, backup: bool, **kwargs: Any) -> None: + if version is not None: + _LOGGER.debug( + "BCS update entity requested specific version=%s (ignored)", version + ) - u = urlparse(repo_url.rstrip("/")) - host = (u.netloc or "").lower() - parts = [p for p in u.path.strip("/").split("/") if p] - if len(parts) < 2: - raise BCSInstallError("Invalid repository URL (missing owner/repo)") + self._in_progress = True + self.async_write_ha_state() - owner = parts[0] - repo = parts[1] - if repo.endswith(".git"): - repo = repo[:-4] - - if "github.com" in host: - return f"https://codeload.github.com/{owner}/{repo}/zip/{ref}" - - if "gitlab" in host: - base = f"{u.scheme}://{u.netloc}" - path = u.path.strip("/") - if path.endswith(".git"): - path = path[:-4] - return f"{base}/{path}/-/archive/{ref}/{repo}-{ref}.zip" - - base = f"{u.scheme}://{u.netloc}" - path = u.path.strip("/") - if path.endswith(".git"): - path = path[:-4] - return f"{base}/{path}/archive/{ref}.zip" - - async def _download_zip(self, url: str, dest: Path) -> None: - session = async_get_clientsession(self.hass) - headers = { - "User-Agent": "BahmcloudStore (Home Assistant)", - "Cache-Control": "no-cache, no-store, max-age=0", - "Pragma": "no-cache", - } - - async with session.get(url, timeout=120, headers=headers) as resp: - if resp.status != 200: - raise BCSInstallError(f"zip_url returned {resp.status}") - data = await resp.read() - - await self.hass.async_add_executor_job(dest.write_bytes, data) - - async def _extract_zip(self, zip_path: Path, extract_dir: Path) -> None: - def _extract() -> None: - with zipfile.ZipFile(zip_path, "r") as zf: - zf.extractall(extract_dir) - - await self.hass.async_add_executor_job(_extract) - - @staticmethod - def _find_custom_components_root(extract_root: Path) -> Path | None: - direct = extract_root / "custom_components" - if direct.exists() and direct.is_dir(): - return direct - - for child in extract_root.iterdir(): - candidate = child / "custom_components" - if candidate.exists() and candidate.is_dir(): - return candidate - return None - - async def _copy_domain_dir(self, src_domain_dir: Path, domain: str) -> None: - dest_root = Path(self.hass.config.path("custom_components")) - target = dest_root / domain - tmp_target = dest_root / f".bcs_tmp_{domain}_{int(time.time())}" - - def _copy() -> None: - if tmp_target.exists(): - shutil.rmtree(tmp_target, ignore_errors=True) - - shutil.copytree(src_domain_dir, tmp_target, dirs_exist_ok=True) - - if target.exists(): - shutil.rmtree(target, ignore_errors=True) - - tmp_target.rename(target) - - await self.hass.async_add_executor_job(_copy) - - async def _read_installed_manifest_version(self, domain: str) -> str | None: - def _read() -> str | None: - try: - p = Path(self.hass.config.path("custom_components", domain, "manifest.json")) - if not p.exists(): - return None - data = json.loads(p.read_text(encoding="utf-8")) - v = data.get("version") - return str(v) if v else None - except Exception: - return None - - return await self.hass.async_add_executor_job(_read) - - async def _refresh_installed_cache(self) -> None: try: - items = await self.storage.list_installed_repos() - cache: dict[str, Any] = {} - for it in items: - cache[it.repo_id] = { - "domains": it.domains, - "installed_version": it.installed_version, # BCS ref - "installed_manifest_version": it.installed_manifest_version, - "ref": it.ref, - "installed_at": it.installed_at, - } - self._installed_cache = cache - except Exception: - self._installed_cache = {} + await self._core.update_repo(self._repo_id) + finally: + self._in_progress = False + self.async_write_ha_state() - async def install_repo(self, repo_id: str) -> dict[str, Any]: - repo = self.get_repo(repo_id) - if not repo: - raise BCSInstallError(f"repo_id not found: {repo_id}") - async with self._install_lock: - ref = self._pick_ref_for_install(repo) - zip_url = self._build_zip_url(repo.url, ref) +@callback +def _sync_entities( + core: BCSCore, + existing: dict[str, BCSRepoUpdateEntity], + async_add_entities: AddEntitiesCallback, +) -> None: + """Ensure there is one update entity per installed repo.""" + installed_map = getattr(core, "_installed_cache", {}) or {} + new_entities: list[BCSRepoUpdateEntity] = [] - _LOGGER.info("BCS install started: repo_id=%s ref=%s zip_url=%s", repo_id, ref, zip_url) + for repo_id, data in installed_map.items(): + if not isinstance(data, dict): + continue + if repo_id in existing: + continue - with tempfile.TemporaryDirectory(prefix="bcs_install_") as td: - tmp = Path(td) - zip_path = tmp / "repo.zip" - extract_dir = tmp / "extract" - extract_dir.mkdir(parents=True, exist_ok=True) + ent = BCSRepoUpdateEntity(core, repo_id) + existing[repo_id] = ent + new_entities.append(ent) - await self._download_zip(zip_url, zip_path) - await self._extract_zip(zip_path, extract_dir) + if new_entities: + async_add_entities(new_entities) - cc_root = self._find_custom_components_root(extract_dir) - if not cc_root: - raise BCSInstallError("custom_components folder not found in repository ZIP") + for ent in existing.values(): + ent.async_write_ha_state() - installed_domains: list[str] = [] - for domain_dir in cc_root.iterdir(): - if not domain_dir.is_dir(): - continue - manifest = domain_dir / "manifest.json" - if not manifest.exists(): - continue - domain = domain_dir.name - await self._copy_domain_dir(domain_dir, domain) - installed_domains.append(domain) +async def async_setup_platform( + hass: HomeAssistant, + config, + async_add_entities: AddEntitiesCallback, + discovery_info=None, +): + """Set up BCS update entities.""" + core: BCSCore | None = hass.data.get(DOMAIN) + if not core: + _LOGGER.debug("BCS core not available, skipping update platform setup") + return - if not installed_domains: - raise BCSInstallError("No integrations found under custom_components/ (missing manifest.json)") + entities: dict[str, BCSRepoUpdateEntity] = {} - # informational only (many repos are wrong here) - installed_manifest_version = await self._read_installed_manifest_version(installed_domains[0]) + _sync_entities(core, entities, async_add_entities) - # IMPORTANT: BCS "installed_version" is the ref we installed (tag/release/branch), - # so update logic won't break when manifest.json is 0.0.0 or outdated. - installed_version = ref + @callback + def _handle_update() -> None: + _sync_entities(core, entities, async_add_entities) - await self.storage.set_installed_repo( - repo_id=repo_id, - url=repo.url, - domains=installed_domains, - installed_version=installed_version, - installed_manifest_version=installed_manifest_version, - ref=ref, - ) - await self._refresh_installed_cache() - - persistent_notification.async_create( - self.hass, - "Bahmcloud Store installation finished. A Home Assistant restart is required to load the integration.", - title="Bahmcloud Store", - notification_id="bcs_restart_required", - ) - - _LOGGER.info( - "BCS install complete: repo_id=%s domains=%s installed_ref=%s manifest_version=%s", - repo_id, - installed_domains, - installed_version, - installed_manifest_version, - ) - self.signal_updated() - return { - "ok": True, - "repo_id": repo_id, - "domains": installed_domains, - "installed_version": installed_version, - "installed_manifest_version": installed_manifest_version, - "restart_required": True, - } - - async def update_repo(self, repo_id: str) -> dict[str, Any]: - _LOGGER.info("BCS update started: repo_id=%s", repo_id) - return await self.install_repo(repo_id) - - async def request_restart(self) -> None: - await self.hass.services.async_call("homeassistant", "restart", {}, blocking=False) \ No newline at end of file + async_dispatcher_connect(hass, SIGNAL_UPDATED, _handle_update) \ No newline at end of file