diff --git a/custom_components/bahmcloud_store/core.py b/custom_components/bahmcloud_store/core.py index efbff00..ed7c9ed 100644 --- a/custom_components/bahmcloud_store/core.py +++ b/custom_components/bahmcloud_store/core.py @@ -1,166 +1,610 @@ from __future__ import annotations +import asyncio +import hashlib +import json import logging +import time +import shutil +import tempfile +import zipfile from dataclasses import dataclass +from pathlib import Path from typing import Any +from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit, urlparse -from homeassistant.components.update import UpdateEntity, UpdateEntityFeature -from homeassistant.core import HomeAssistant, callback -from homeassistant.helpers.dispatcher import async_dispatcher_connect -from homeassistant.helpers.entity_platform import AddEntitiesCallback -from homeassistant.helpers.entity import EntityCategory +from homeassistant.core import HomeAssistant +from homeassistant.helpers.aiohttp_client import async_get_clientsession +from homeassistant.helpers.dispatcher import async_dispatcher_send +from homeassistant.helpers import issue_registry as ir +from homeassistant.util import yaml as ha_yaml -from .core import DOMAIN, SIGNAL_UPDATED, BCSCore +from .storage import BCSStorage, CustomRepo +from .providers import fetch_repo_info, detect_provider, RepoInfo, fetch_readme_markdown +from .metadata import fetch_repo_metadata, RepoMetadata _LOGGER = logging.getLogger(__name__) +DOMAIN = "bahmcloud_store" -def _pretty_repo_name(core: BCSCore, repo_id: str) -> str: - """Return a human-friendly name for a repo update entity.""" - try: - repo = core.get_repo(repo_id) - if repo and getattr(repo, "name", None): - name = str(repo.name).strip() - if name: - return name - except Exception: - pass - - # Fallbacks - if repo_id.startswith("index:"): - return f"BCS Index {repo_id.split(':', 1)[1]}" - if repo_id.startswith("custom:"): - return f"BCS Custom {repo_id.split(':', 1)[1]}" - return f"BCS {repo_id}" +SIGNAL_UPDATED = f"{DOMAIN}_updated" +RESTART_REQUIRED_ISSUE_ID = "restart_required" -@dataclass(frozen=True) -class _RepoKey: - repo_id: str +class BCSError(Exception): + """BCS core error.""" -class BCSRepoUpdateEntity(UpdateEntity): - """Update entity representing a BCS-managed repository.""" +class BCSInstallError(BCSError): + """BCS installation/update error.""" - _attr_entity_category = EntityCategory.DIAGNOSTIC - _attr_supported_features = UpdateEntityFeature.INSTALL - def __init__(self, core: BCSCore, repo_id: str) -> None: - self._core = core - self._repo_id = repo_id - self._in_progress = False +@dataclass +class BCSConfig: + store_url: str - # Stable unique id (do NOT change) - self._attr_unique_id = f"{DOMAIN}:{repo_id}" - # Human-friendly name in UI - pretty = _pretty_repo_name(core, repo_id) - self._attr_name = pretty +@dataclass +class RepoItem: + id: str + name: str + url: str + source: str # "index" | "custom" - # Title shown in the entity dialog - self._attr_title = pretty + owner: str | None = None + provider: str | None = None + provider_repo_name: str | None = None + provider_description: str | None = None + default_branch: str | None = None - @property - def available(self) -> bool: - repo = self._core.get_repo(self._repo_id) - installed = self._core.get_installed(self._repo_id) - return repo is not None and installed is not None + latest_version: str | None = None + latest_version_source: str | None = None # "release" | "tag" | "atom" | None - @property - def in_progress(self) -> bool | None: - return self._in_progress + meta_source: str | None = None + meta_name: str | None = None + meta_description: str | None = None + meta_category: str | None = None + meta_author: str | None = None + meta_maintainer: str | None = None - @property - def installed_version(self) -> str | None: - installed = self._core.get_installed(self._repo_id) or {} - v = installed.get("installed_version") or installed.get("ref") - return str(v) if v else None - @property - def latest_version(self) -> str | None: - repo = self._core.get_repo(self._repo_id) - if not repo: - return None - v = getattr(repo, "latest_version", None) - return str(v) if v else None +class BCSCore: + def __init__(self, hass: HomeAssistant, config: BCSConfig) -> None: + self.hass = hass + self.config = config + self.storage = BCSStorage(hass) - @property - def update_available(self) -> bool: - latest = self.latest_version - installed = self.installed_version - if not latest or not installed: - return False - return latest != installed + self.refresh_seconds: int = 300 + self.repos: dict[str, RepoItem] = {} + self._listeners: list[callable] = [] - def version_is_newer(self, latest_version: str, installed_version: str) -> bool: - return latest_version != installed_version + # Will be loaded asynchronously (no blocking IO in event loop) + self.version: str = "unknown" - @property - def release_url(self) -> str | None: - repo = self._core.get_repo(self._repo_id) - return getattr(repo, "url", None) if repo else None + # Diagnostics (helps verify refresh behavior) + self.last_index_url: str | None = None + self.last_index_bytes: int | None = None + self.last_index_hash: str | None = None + self.last_index_loaded_at: float | None = None - async def async_install(self, version: str | None, backup: bool, **kwargs: Any) -> None: - if version is not None: - _LOGGER.debug( - "BCS update entity requested specific version=%s (ignored)", version + self._install_lock = asyncio.Lock() + self._installed_cache: dict[str, Any] = {} + + async def async_initialize(self) -> None: + """Async initialization that avoids blocking file IO.""" + self.version = await self._read_manifest_version_async() + await self._refresh_installed_cache() + + # After a successful HA restart, restart-required is no longer relevant. + self._clear_restart_required_issue() + + async def _read_manifest_version_async(self) -> str: + def _read() -> str: + try: + manifest_path = Path(__file__).resolve().parent / "manifest.json" + data = json.loads(manifest_path.read_text(encoding="utf-8")) + v = data.get("version") + return str(v) if v else "unknown" + except Exception: + return "unknown" + + return await self.hass.async_add_executor_job(_read) + + def add_listener(self, cb) -> None: + self._listeners.append(cb) + + def signal_updated(self) -> None: + # Notify entities/platforms (e.g. update entities) that BCS data changed. + async_dispatcher_send(self.hass, SIGNAL_UPDATED) + for cb in list(self._listeners): + try: + cb() + except Exception: + pass + + def _mark_restart_required(self) -> None: + """Show a 'restart required' issue in Home Assistant Settings. + + IMPORTANT: + - is_fixable=True enables the "Fix/OK" button + - the real action is implemented in repairs.py (fix flow) + """ + try: + ir.async_create_issue( + self.hass, + DOMAIN, + RESTART_REQUIRED_ISSUE_ID, + is_fixable=True, # <-- IMPORTANT: show "Fix" button + is_persistent=False, + severity=ir.IssueSeverity.WARNING, + translation_key=RESTART_REQUIRED_ISSUE_ID, + ) + except Exception: + _LOGGER.debug("Failed to create restart required issue", exc_info=True) + + def _clear_restart_required_issue(self) -> None: + """Remove restart required issue after HA restarted.""" + try: + if hasattr(ir, "async_delete_issue"): + ir.async_delete_issue(self.hass, DOMAIN, RESTART_REQUIRED_ISSUE_ID) + elif hasattr(ir, "async_remove_issue"): + ir.async_remove_issue(self.hass, DOMAIN, RESTART_REQUIRED_ISSUE_ID) + except Exception: + _LOGGER.debug("Failed to clear restart required issue", exc_info=True) + + async def full_refresh(self, source: str = "manual") -> None: + """Single refresh entry-point used by both timer and manual button.""" + _LOGGER.info("BCS full refresh triggered (source=%s)", source) + await self.refresh() + self.signal_updated() + + def get_repo(self, repo_id: str) -> RepoItem | None: + return self.repos.get(repo_id) + + def get_installed(self, repo_id: str) -> dict[str, Any] | None: + """Return cached installation info for a repo_id (no I/O).""" + data = (self._installed_cache or {}).get(repo_id) + return data if isinstance(data, dict) else None + + async def refresh(self) -> None: + index_repos, refresh_seconds = await self._load_index_repos() + self.refresh_seconds = refresh_seconds + + custom_repos = await self.storage.list_custom_repos() + + merged: dict[str, RepoItem] = {} + + for item in index_repos: + merged[item.id] = item + + for c in custom_repos: + merged[c.id] = RepoItem( + id=c.id, + name=(c.name or c.url), + url=c.url, + source="custom", ) - self._in_progress = True - self.async_write_ha_state() + for r in merged.values(): + r.provider = detect_provider(r.url) + + await self._enrich_and_resolve(merged) + self.repos = merged + + _LOGGER.info( + "BCS refresh complete: repos=%s (index=%s, custom=%s)", + len(self.repos), + len([r for r in self.repos.values() if r.source == "index"]), + len([r for r in self.repos.values() if r.source == "custom"]), + ) + + async def _enrich_and_resolve(self, merged: dict[str, RepoItem]) -> None: + sem = asyncio.Semaphore(6) + + async def process_one(r: RepoItem) -> None: + async with sem: + info: RepoInfo = await fetch_repo_info(self.hass, r.url) + + r.provider = info.provider or r.provider + r.owner = info.owner or r.owner + r.provider_repo_name = info.repo_name + r.provider_description = info.description + r.default_branch = info.default_branch or r.default_branch + + r.latest_version = info.latest_version + r.latest_version_source = info.latest_version_source + + md: RepoMetadata = await fetch_repo_metadata(self.hass, r.url, r.default_branch) + r.meta_source = md.source + r.meta_name = md.name + r.meta_description = md.description + r.meta_category = md.category + r.meta_author = md.author + r.meta_maintainer = md.maintainer + + has_user_or_index_name = bool(r.name) and (r.name != r.url) and (not str(r.name).startswith("http")) + if r.meta_name: + r.name = r.meta_name + elif not has_user_or_index_name and r.provider_repo_name: + r.name = r.provider_repo_name + elif not r.name: + r.name = r.url + + await asyncio.gather(*(process_one(r) for r in merged.values()), return_exceptions=True) + + def _add_cache_buster(self, url: str) -> str: + parts = urlsplit(url) + q = dict(parse_qsl(parts.query, keep_blank_values=True)) + q["t"] = str(int(time.time())) + new_query = urlencode(q) + return urlunsplit((parts.scheme, parts.netloc, parts.path, new_query, parts.fragment)) + + def _gitea_src_to_raw(self, url: str) -> str: + parts = urlsplit(url) + path = parts.path + path2 = path.replace("/src/branch/", "/raw/branch/") + if path2 == path: + return url + return urlunsplit((parts.scheme, parts.netloc, path2, parts.query, parts.fragment)) + + async def _fetch_store_text(self, url: str) -> str: + session = async_get_clientsession(self.hass) + + headers = { + "User-Agent": "BahmcloudStore (Home Assistant)", + "Cache-Control": "no-cache, no-store, max-age=0", + "Pragma": "no-cache", + "Expires": "0", + } + + async with session.get(url, timeout=30, headers=headers) as resp: + if resp.status != 200: + raise BCSError(f"store_url returned {resp.status}") + return await resp.text() + + async def _load_index_repos(self) -> tuple[list[RepoItem], int]: + store_url = (self.config.store_url or "").strip() + if not store_url: + raise BCSError("store_url is empty") + + url = self._add_cache_buster(store_url) try: - await self._core.update_repo(self._repo_id) - finally: - self._in_progress = False - self.async_write_ha_state() + raw = await self._fetch_store_text(url) + # If we fetched a HTML page (wrong endpoint), attempt raw conversion. + if " None: - """Ensure there is one update entity per installed repo.""" - installed_map = getattr(core, "_installed_cache", {}) or {} - new_entities: list[BCSRepoUpdateEntity] = [] + except Exception as e: + raise BCSError(f"Failed fetching store index: {e}") from e - for repo_id, data in installed_map.items(): - if not isinstance(data, dict): - continue - if repo_id in existing: - continue + # Diagnostics + b = raw.encode("utf-8", errors="replace") + h = hashlib.sha256(b).hexdigest()[:12] + self.last_index_url = url + self.last_index_bytes = len(b) + self.last_index_hash = h + self.last_index_loaded_at = time.time() - ent = BCSRepoUpdateEntity(core, repo_id) - existing[repo_id] = ent - new_entities.append(ent) + _LOGGER.info( + "BCS index loaded: url=%s bytes=%s sha=%s", + self.last_index_url, + self.last_index_bytes, + self.last_index_hash, + ) - if new_entities: - async_add_entities(new_entities) + try: + data = ha_yaml.parse_yaml(raw) + if not isinstance(data, dict): + raise BCSError("store.yaml must be a mapping") - for ent in existing.values(): - ent.async_write_ha_state() + refresh_seconds = int(data.get("refresh_seconds", 300)) + repos = data.get("repos", []) + if not isinstance(repos, list): + raise BCSError("store.yaml 'repos' must be a list") + items: list[RepoItem] = [] + for i, r in enumerate(repos): + if not isinstance(r, dict): + continue + repo_url = str(r.get("url", "")).strip() + if not repo_url: + continue + name = str(r.get("name") or repo_url).strip() -async def async_setup_platform( - hass: HomeAssistant, - config, - async_add_entities: AddEntitiesCallback, - discovery_info=None, -): - """Set up BCS update entities.""" - core: BCSCore | None = hass.data.get(DOMAIN) - if not core: - _LOGGER.debug("BCS core not available, skipping update platform setup") - return + items.append( + RepoItem( + id=f"index:{i}", + name=name, + url=repo_url, + source="index", + ) + ) - entities: dict[str, BCSRepoUpdateEntity] = {} + _LOGGER.info("BCS index parsed: repos=%s refresh_seconds=%s", len(items), refresh_seconds) + return items, refresh_seconds + except Exception as e: + raise BCSError(f"Invalid store.yaml: {e}") from e - _sync_entities(core, entities, async_add_entities) + async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo: + url = str(url or "").strip() + if not url: + raise BCSError("Missing url") - @callback - def _handle_update() -> None: - _sync_entities(core, entities, async_add_entities) + c = await self.storage.add_custom_repo(url, name) + await self.full_refresh(source="custom_repo_add") + return c - async_dispatcher_connect(hass, SIGNAL_UPDATED, _handle_update) \ No newline at end of file + async def remove_custom_repo(self, repo_id: str) -> None: + await self.storage.remove_custom_repo(repo_id) + await self.full_refresh(source="custom_repo_remove") + + async def list_custom_repos(self) -> list[CustomRepo]: + return await self.storage.list_custom_repos() + + def list_repos_public(self) -> list[dict[str, Any]]: + out: list[dict[str, Any]] = [] + + installed_map: dict[str, Any] = getattr(self, "_installed_cache", {}) or {} + if not isinstance(installed_map, dict): + installed_map = {} + + for r in self.repos.values(): + inst = installed_map.get(r.id) + installed = bool(inst) + installed_domains: list[str] = [] + installed_version: str | None = None + installed_manifest_version: str | None = None + + if isinstance(inst, dict): + d = inst.get("domains") or [] + if isinstance(d, list): + installed_domains = [str(x) for x in d if str(x).strip()] + + v = inst.get("installed_version") + installed_version = str(v) if v is not None else None + + mv = inst.get("installed_manifest_version") + installed_manifest_version = str(mv) if mv is not None else None + + out.append( + { + "id": r.id, + "name": r.name, + "url": r.url, + "source": r.source, + "owner": r.owner, + "provider": r.provider, + "repo_name": r.provider_repo_name, + "description": r.provider_description or r.meta_description, + "default_branch": r.default_branch, + "latest_version": r.latest_version, + "latest_version_source": r.latest_version_source, + "category": r.meta_category, + "meta_author": r.meta_author, + "meta_maintainer": r.meta_maintainer, + "meta_source": r.meta_source, + "installed": installed, + "installed_version": installed_version, + "installed_manifest_version": installed_manifest_version, + "installed_domains": installed_domains, + } + ) + return out + + async def fetch_readme_markdown(self, repo_id: str) -> str | None: + repo = self.get_repo(repo_id) + if not repo: + return None + + return await fetch_readme_markdown( + self.hass, + repo.url, + provider=repo.provider, + default_branch=repo.default_branch, + ) + + def _pick_ref_for_install(self, repo: RepoItem) -> str: + if repo.latest_version and str(repo.latest_version).strip(): + return str(repo.latest_version).strip() + if repo.default_branch and str(repo.default_branch).strip(): + return str(repo.default_branch).strip() + return "main" + + def _build_zip_url(self, repo_url: str, ref: str) -> str: + ref = (ref or "").strip() + if not ref: + raise BCSInstallError("Missing ref for ZIP download") + + u = urlparse(repo_url.rstrip("/")) + host = (u.netloc or "").lower() + parts = [p for p in u.path.strip("/").split("/") if p] + if len(parts) < 2: + raise BCSInstallError("Invalid repository URL (missing owner/repo)") + + owner = parts[0] + repo = parts[1] + if repo.endswith(".git"): + repo = repo[:-4] + + if "github.com" in host: + return f"https://codeload.github.com/{owner}/{repo}/zip/{ref}" + + if "gitlab" in host: + base = f"{u.scheme}://{u.netloc}" + path = u.path.strip("/") + if path.endswith(".git"): + path = path[:-4] + return f"{base}/{path}/-/archive/{ref}/{repo}-{ref}.zip" + + base = f"{u.scheme}://{u.netloc}" + path = u.path.strip("/") + if path.endswith(".git"): + path = path[:-4] + return f"{base}/{path}/archive/{ref}.zip" + + async def _download_zip(self, url: str, dest: Path) -> None: + session = async_get_clientsession(self.hass) + headers = { + "User-Agent": "BahmcloudStore (Home Assistant)", + "Cache-Control": "no-cache, no-store, max-age=0", + "Pragma": "no-cache", + } + + async with session.get(url, timeout=120, headers=headers) as resp: + if resp.status != 200: + raise BCSInstallError(f"zip_url returned {resp.status}") + data = await resp.read() + + await self.hass.async_add_executor_job(dest.write_bytes, data) + + async def _extract_zip(self, zip_path: Path, extract_dir: Path) -> None: + def _extract() -> None: + with zipfile.ZipFile(zip_path, "r") as zf: + zf.extractall(extract_dir) + + await self.hass.async_add_executor_job(_extract) + + @staticmethod + def _find_custom_components_root(extract_root: Path) -> Path | None: + direct = extract_root / "custom_components" + if direct.exists() and direct.is_dir(): + return direct + + for child in extract_root.iterdir(): + candidate = child / "custom_components" + if candidate.exists() and candidate.is_dir(): + return candidate + return None + + async def _copy_domain_dir(self, src_domain_dir: Path, domain: str) -> None: + dest_root = Path(self.hass.config.path("custom_components")) + target = dest_root / domain + tmp_target = dest_root / f".bcs_tmp_{domain}_{int(time.time())}" + + def _copy() -> None: + if tmp_target.exists(): + shutil.rmtree(tmp_target, ignore_errors=True) + + shutil.copytree(src_domain_dir, tmp_target, dirs_exist_ok=True) + + if target.exists(): + shutil.rmtree(target, ignore_errors=True) + + tmp_target.rename(target) + + await self.hass.async_add_executor_job(_copy) + + async def _read_installed_manifest_version(self, domain: str) -> str | None: + def _read() -> str | None: + try: + p = Path(self.hass.config.path("custom_components", domain, "manifest.json")) + if not p.exists(): + return None + data = json.loads(p.read_text(encoding="utf-8")) + v = data.get("version") + return str(v) if v else None + except Exception: + return None + + return await self.hass.async_add_executor_job(_read) + + async def _refresh_installed_cache(self) -> None: + try: + items = await self.storage.list_installed_repos() + cache: dict[str, Any] = {} + for it in items: + cache[it.repo_id] = { + "installed": True, + "domains": it.domains, + "installed_version": it.installed_version, + "installed_manifest_version": it.installed_manifest_version, + "ref": it.ref, + "installed_at": it.installed_at, + } + self._installed_cache = cache + except Exception: + self._installed_cache = {} + + async def install_repo(self, repo_id: str) -> dict[str, Any]: + repo = self.get_repo(repo_id) + if not repo: + raise BCSInstallError(f"repo_id not found: {repo_id}") + + async with self._install_lock: + ref = self._pick_ref_for_install(repo) + zip_url = self._build_zip_url(repo.url, ref) + + _LOGGER.info("BCS install started: repo_id=%s ref=%s zip_url=%s", repo_id, ref, zip_url) + + with tempfile.TemporaryDirectory(prefix="bcs_install_") as td: + tmp = Path(td) + zip_path = tmp / "repo.zip" + extract_dir = tmp / "extract" + extract_dir.mkdir(parents=True, exist_ok=True) + + await self._download_zip(zip_url, zip_path) + await self._extract_zip(zip_path, extract_dir) + + cc_root = self._find_custom_components_root(extract_dir) + if not cc_root: + raise BCSInstallError("custom_components folder not found in repository ZIP") + + installed_domains: list[str] = [] + for domain_dir in cc_root.iterdir(): + if not domain_dir.is_dir(): + continue + manifest = domain_dir / "manifest.json" + if not manifest.exists(): + continue + + domain = domain_dir.name + await self._copy_domain_dir(domain_dir, domain) + installed_domains.append(domain) + + if not installed_domains: + raise BCSInstallError("No integrations found under custom_components/ (missing manifest.json)") + + installed_manifest_version = await self._read_installed_manifest_version(installed_domains[0]) + installed_version = ref + + await self.storage.set_installed_repo( + repo_id=repo_id, + url=repo.url, + domains=installed_domains, + installed_version=installed_version, + installed_manifest_version=installed_manifest_version, + ref=ref, + ) + await self._refresh_installed_cache() + + self._mark_restart_required() + + _LOGGER.info( + "BCS install complete: repo_id=%s domains=%s installed_ref=%s manifest_version=%s", + repo_id, + installed_domains, + installed_version, + installed_manifest_version, + ) + self.signal_updated() + return { + "ok": True, + "repo_id": repo_id, + "domains": installed_domains, + "installed_version": installed_version, + "installed_manifest_version": installed_manifest_version, + "restart_required": True, + } + + async def update_repo(self, repo_id: str) -> dict[str, Any]: + _LOGGER.info("BCS update started: repo_id=%s", repo_id) + return await self.install_repo(repo_id) + + async def request_restart(self) -> None: + await self.hass.services.async_call("homeassistant", "restart", {}, blocking=False) \ No newline at end of file