from __future__ import annotations import time import uuid from dataclasses import dataclass from typing import Any from homeassistant.core import HomeAssistant from homeassistant.helpers.storage import Store _STORAGE_VERSION = 1 _STORAGE_KEY = "bcs_store" @dataclass class CustomRepo: id: str url: str name: str | None = None @dataclass class InstalledRepo: repo_id: str url: str domains: list[str] installed_at: int installed_version: str | None = None # BCS "installed ref" (tag/release/branch) installed_manifest_version: str | None = None # informational only ref: str | None = None # kept for backward compatibility / diagnostics class BCSStorage: """Persistent storage for Bahmcloud Store. Keys: - custom_repos: list of manually added repositories - installed_repos: mapping repo_id -> installed metadata """ def __init__(self, hass: HomeAssistant) -> None: self.hass = hass self._store: Store[dict[str, Any]] = Store(hass, _STORAGE_VERSION, _STORAGE_KEY) async def _load(self) -> dict[str, Any]: data = await self._store.async_load() or {} if not isinstance(data, dict): data = {} if "custom_repos" not in data or not isinstance(data.get("custom_repos"), list): data["custom_repos"] = [] if "installed_repos" not in data or not isinstance(data.get("installed_repos"), dict): data["installed_repos"] = {} return data async def _save(self, data: dict[str, Any]) -> None: await self._store.async_save(data) async def list_custom_repos(self) -> list[CustomRepo]: data = await self._load() repos = data.get("custom_repos", []) out: list[CustomRepo] = [] for r in repos: if not isinstance(r, dict): continue rid = r.get("id") url = r.get("url") if not rid or not url: continue out.append(CustomRepo(id=str(rid), url=str(url), name=r.get("name"))) return out async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo: data = await self._load() repos = data.get("custom_repos", []) # De-duplicate by URL for r in repos: if isinstance(r, dict) and str(r.get("url") or "").strip() == url.strip(): return CustomRepo(id=str(r["id"]), url=str(r["url"]), name=r.get("name")) rid = f"custom:{uuid.uuid4().hex[:10]}" entry = {"id": rid, "url": url.strip(), "name": name.strip() if name else None} repos.append(entry) data["custom_repos"] = repos await self._save(data) return CustomRepo(id=rid, url=entry["url"], name=entry["name"]) async def remove_custom_repo(self, repo_id: str) -> None: data = await self._load() repos = data.get("custom_repos", []) data["custom_repos"] = [ r for r in repos if not (isinstance(r, dict) and r.get("id") == repo_id) ] await self._save(data) async def get_installed_repo(self, repo_id: str) -> InstalledRepo | None: data = await self._load() installed = data.get("installed_repos", {}) if not isinstance(installed, dict): return None entry = installed.get(repo_id) if not isinstance(entry, dict): return None try: domains = entry.get("domains") or [] if not isinstance(domains, list): domains = [] domains = [str(d) for d in domains if str(d).strip()] installed_version = entry.get("installed_version") ref = entry.get("ref") # Backward compatibility: # If installed_version wasn't stored, fall back to ref. if (not installed_version) and ref: installed_version = ref installed_manifest_version = entry.get("installed_manifest_version") return InstalledRepo( repo_id=str(entry.get("repo_id") or repo_id), url=str(entry.get("url") or ""), domains=domains, installed_at=int(entry.get("installed_at") or 0), installed_version=str(installed_version) if installed_version else None, installed_manifest_version=str(installed_manifest_version) if installed_manifest_version else None, ref=str(ref) if ref else None, ) except Exception: return None async def list_installed_repos(self) -> list[InstalledRepo]: data = await self._load() installed = data.get("installed_repos", {}) out: list[InstalledRepo] = [] if not isinstance(installed, dict): return out for rid in list(installed.keys()): item = await self.get_installed_repo(str(rid)) if item: out.append(item) return out async def set_installed_repo( self, *, repo_id: str, url: str, domains: list[str], installed_version: str | None, installed_manifest_version: str | None = None, ref: str | None, ) -> None: data = await self._load() installed = data.get("installed_repos", {}) if not isinstance(installed, dict): installed = {} data["installed_repos"] = installed installed[str(repo_id)] = { "repo_id": str(repo_id), "url": str(url), "domains": [str(d) for d in (domains or []) if str(d).strip()], "installed_at": int(time.time()), # IMPORTANT: this is what BCS uses as "installed version" (ref/tag/branch) "installed_version": installed_version, # informational only "installed_manifest_version": installed_manifest_version, # keep ref too (debug/backward compatibility) "ref": ref, } await self._save(data) async def remove_installed_repo(self, repo_id: str) -> None: data = await self._load() installed = data.get("installed_repos", {}) if isinstance(installed, dict) and repo_id in installed: installed.pop(repo_id, None) data["installed_repos"] = installed await self._save(data)