From ce5802721f21fecce627eb0cc882a3fb2d4deb1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Bachmann?= Date: Thu, 15 Jan 2026 19:52:05 +0000 Subject: [PATCH] . --- custom_components/bahmcloud_store/storage.py | 125 ++++++++++++++++--- 1 file changed, 107 insertions(+), 18 deletions(-) diff --git a/custom_components/bahmcloud_store/storage.py b/custom_components/bahmcloud_store/storage.py index 8f359cd..1fe35f5 100644 --- a/custom_components/bahmcloud_store/storage.py +++ b/custom_components/bahmcloud_store/storage.py @@ -1,5 +1,6 @@ from __future__ import annotations +import time import uuid from dataclasses import dataclass from typing import Any @@ -18,19 +19,39 @@ class CustomRepo: name: str | None = None +@dataclass +class InstalledRepo: + repo_id: str + url: str + domains: list[str] + installed_at: int + installed_version: str | None = None + ref: str | None = None + + class BCSStorage: - """Persistent storage for manually added repositories.""" + """Persistent storage for Bahmcloud Store. + + Keys: + - custom_repos: list of manually added repositories + - installed_repos: mapping repo_id -> installed metadata + """ def __init__(self, hass: HomeAssistant) -> None: self.hass = hass - self._store = Store(hass, _STORAGE_VERSION, _STORAGE_KEY) + self._store: Store[dict[str, Any]] = Store(hass, _STORAGE_VERSION, _STORAGE_KEY) async def _load(self) -> dict[str, Any]: - data = await self._store.async_load() - if not data: - return {"custom_repos": []} - if "custom_repos" not in data: + data = await self._store.async_load() or {} + if not isinstance(data, dict): + data = {} + + if "custom_repos" not in data or not isinstance(data.get("custom_repos"), list): data["custom_repos"] = [] + + if "installed_repos" not in data or not isinstance(data.get("installed_repos"), dict): + data["installed_repos"] = {} + return data async def _save(self, data: dict[str, Any]) -> None: @@ -43,24 +64,20 @@ class BCSStorage: for r in repos: if not isinstance(r, dict): continue - rid = str(r.get("id") or "") - url = str(r.get("url") or "") - name = r.get("name") - if rid and url: - out.append(CustomRepo(id=rid, url=url, name=str(name) if name else None)) + rid = r.get("id") + url = r.get("url") + if not rid or not url: + continue + out.append(CustomRepo(id=str(rid), url=str(url), name=r.get("name"))) return out async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo: data = await self._load() repos = data.get("custom_repos", []) - # Deduplicate by URL + # De-duplicate by URL for r in repos: - if isinstance(r, dict) and str(r.get("url", "")).strip() == url.strip(): - # Update name if provided - if name: - r["name"] = name - await self._save(data) + if isinstance(r, dict) and str(r.get("url") or "").strip() == url.strip(): return CustomRepo(id=str(r["id"]), url=str(r["url"]), name=r.get("name")) rid = f"custom:{uuid.uuid4().hex[:10]}" @@ -73,6 +90,78 @@ class BCSStorage: async def remove_custom_repo(self, repo_id: str) -> None: data = await self._load() repos = data.get("custom_repos", []) - data["custom_repos"] = [r for r in repos if not (isinstance(r, dict) and r.get("id") == repo_id)] + data["custom_repos"] = [ + r for r in repos if not (isinstance(r, dict) and r.get("id") == repo_id) + ] await self._save(data) + async def get_installed_repo(self, repo_id: str) -> InstalledRepo | None: + data = await self._load() + installed = data.get("installed_repos", {}) + if not isinstance(installed, dict): + return None + entry = installed.get(repo_id) + if not isinstance(entry, dict): + return None + + try: + domains = entry.get("domains") or [] + if not isinstance(domains, list): + domains = [] + domains = [str(d) for d in domains if str(d).strip()] + + return InstalledRepo( + repo_id=str(entry.get("repo_id") or repo_id), + url=str(entry.get("url") or ""), + domains=domains, + installed_at=int(entry.get("installed_at") or 0), + installed_version=str(entry.get("installed_version")) if entry.get("installed_version") else None, + ref=str(entry.get("ref")) if entry.get("ref") else None, + ) + except Exception: + return None + + async def list_installed_repos(self) -> list[InstalledRepo]: + data = await self._load() + installed = data.get("installed_repos", {}) + out: list[InstalledRepo] = [] + if not isinstance(installed, dict): + return out + for repo_id in list(installed.keys()): + item = await self.get_installed_repo(str(repo_id)) + if item: + out.append(item) + return out + + async def set_installed_repo( + self, + *, + repo_id: str, + url: str, + domains: list[str], + installed_version: str | None, + ref: str | None, + ) -> None: + data = await self._load() + installed = data.get("installed_repos", {}) + if not isinstance(installed, dict): + installed = {} + data["installed_repos"] = installed + + installed[str(repo_id)] = { + "repo_id": str(repo_id), + "url": str(url), + "domains": [str(d) for d in (domains or []) if str(d).strip()], + "installed_at": int(time.time()), + "installed_version": installed_version, + "ref": ref, + } + await self._save(data) + + async def remove_installed_repo(self, repo_id: str) -> None: + data = await self._load() + installed = data.get("installed_repos", {}) + if isinstance(installed, dict) and repo_id in installed: + installed.pop(repo_id, None) + data["installed_repos"] = installed + await self._save(data) \ No newline at end of file