Files
bahmcloud_store/custom_components/bahmcloud_store/storage.py

285 lines
10 KiB
Python

from __future__ import annotations
import time
import uuid
from dataclasses import dataclass
from typing import Any
from homeassistant.core import HomeAssistant
from homeassistant.helpers.storage import Store
_STORAGE_VERSION = 1
_STORAGE_KEY = "bcs_store"
@dataclass
class CustomRepo:
id: str
url: str
name: str | None = None
@dataclass
class InstalledRepo:
repo_id: str
url: str
domains: list[str]
installed_at: int
install_type: str = "integration"
installed_paths: list[str] | None = None
installed_version: str | None = None # BCS "installed ref" (tag/release/branch)
installed_manifest_version: str | None = None # informational only
ref: str | None = None # kept for backward compatibility / diagnostics
class BCSStorage:
"""Persistent storage for Bahmcloud Store.
Keys:
- custom_repos: list of manually added repositories
- installed_repos: mapping repo_id -> installed metadata
- settings: persistent user settings (e.g. toggles in the UI)
- hacs_cache: cached HACS metadata to improve UX (display names/descriptions)
- repo_cache: cached per-repo enrichment (names/descriptions/versions) to keep the UI populated after restart
"""
def __init__(self, hass: HomeAssistant) -> None:
self.hass = hass
self._store: Store[dict[str, Any]] = Store(hass, _STORAGE_VERSION, _STORAGE_KEY)
async def _load(self) -> dict[str, Any]:
data = await self._store.async_load() or {}
if not isinstance(data, dict):
data = {}
if "custom_repos" not in data or not isinstance(data.get("custom_repos"), list):
data["custom_repos"] = []
if "installed_repos" not in data or not isinstance(data.get("installed_repos"), dict):
data["installed_repos"] = {}
if "settings" not in data or not isinstance(data.get("settings"), dict):
data["settings"] = {}
if "hacs_cache" not in data or not isinstance(data.get("hacs_cache"), dict):
data["hacs_cache"] = {}
if "repo_cache" not in data or not isinstance(data.get("repo_cache"), dict):
data["repo_cache"] = {}
return data
async def get_repo_cache(self) -> dict[str, Any]:
"""Return cached per-repo enrichment data.
Shape:
{
"fetched_at": <unix_ts>,
"repos": {
"<repo_id>": {
"ts": <unix_ts>,
"url": "...",
"name": "...",
"provider_description": "...",
"meta_name": "...",
"meta_description": "...",
"meta_category": "...",
"meta_source": "...",
"latest_version": "...",
"latest_version_source": "...",
"default_branch": "...",
"owner": "...",
"provider_repo_name": "..."
}
}
}
"""
data = await self._load()
cache = data.get("repo_cache", {})
return cache if isinstance(cache, dict) else {}
async def set_repo_cache(self, cache: dict[str, Any]) -> None:
"""Persist cached per-repo enrichment data."""
data = await self._load()
data["repo_cache"] = cache if isinstance(cache, dict) else {}
await self._save(data)
async def get_hacs_cache(self) -> dict[str, Any]:
"""Return cached HACS metadata.
Shape:
{
"fetched_at": <unix_ts>,
"repos": {"owner/repo": {"name": "...", "description": "...", "domain": "..."}}
}
"""
data = await self._load()
cache = data.get("hacs_cache", {})
return cache if isinstance(cache, dict) else {}
async def set_hacs_cache(self, cache: dict[str, Any]) -> None:
"""Persist cached HACS metadata."""
data = await self._load()
data["hacs_cache"] = cache if isinstance(cache, dict) else {}
await self._save(data)
async def get_settings(self) -> dict[str, Any]:
"""Return persistent settings.
Currently used for UI/behavior toggles.
"""
data = await self._load()
settings = data.get("settings", {})
return settings if isinstance(settings, dict) else {}
async def set_settings(self, updates: dict[str, Any]) -> dict[str, Any]:
"""Update persistent settings and return the merged settings."""
data = await self._load()
settings = data.get("settings", {})
if not isinstance(settings, dict):
settings = {}
for k, v in (updates or {}).items():
settings[str(k)] = v
data["settings"] = settings
await self._save(data)
return settings
async def _save(self, data: dict[str, Any]) -> None:
await self._store.async_save(data)
async def list_custom_repos(self) -> list[CustomRepo]:
data = await self._load()
repos = data.get("custom_repos", [])
out: list[CustomRepo] = []
for r in repos:
if not isinstance(r, dict):
continue
rid = r.get("id")
url = r.get("url")
if not rid or not url:
continue
out.append(CustomRepo(id=str(rid), url=str(url), name=r.get("name")))
return out
async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo:
data = await self._load()
repos = data.get("custom_repos", [])
# De-duplicate by URL
for r in repos:
if isinstance(r, dict) and str(r.get("url") or "").strip() == url.strip():
return CustomRepo(id=str(r["id"]), url=str(r["url"]), name=r.get("name"))
rid = f"custom:{uuid.uuid4().hex[:10]}"
entry = {"id": rid, "url": url.strip(), "name": name.strip() if name else None}
repos.append(entry)
data["custom_repos"] = repos
await self._save(data)
return CustomRepo(id=rid, url=entry["url"], name=entry["name"])
async def remove_custom_repo(self, repo_id: str) -> None:
data = await self._load()
repos = data.get("custom_repos", [])
data["custom_repos"] = [
r for r in repos if not (isinstance(r, dict) and r.get("id") == repo_id)
]
await self._save(data)
async def get_installed_repo(self, repo_id: str) -> InstalledRepo | None:
data = await self._load()
installed = data.get("installed_repos", {})
if not isinstance(installed, dict):
return None
entry = installed.get(repo_id)
if not isinstance(entry, dict):
return None
try:
domains = entry.get("domains") or []
if not isinstance(domains, list):
domains = []
domains = [str(d) for d in domains if str(d).strip()]
installed_paths = entry.get("installed_paths") or []
if not isinstance(installed_paths, list):
installed_paths = []
installed_paths = [str(p) for p in installed_paths if str(p).strip()]
install_type = str(entry.get("install_type") or "integration").strip() or "integration"
installed_version = entry.get("installed_version")
ref = entry.get("ref")
# Backward compatibility:
# If installed_version wasn't stored, fall back to ref.
if (not installed_version) and ref:
installed_version = ref
installed_manifest_version = entry.get("installed_manifest_version")
return InstalledRepo(
repo_id=str(entry.get("repo_id") or repo_id),
url=str(entry.get("url") or ""),
domains=domains,
installed_at=int(entry.get("installed_at") or 0),
install_type=install_type,
installed_paths=installed_paths,
installed_version=str(installed_version) if installed_version else None,
installed_manifest_version=str(installed_manifest_version) if installed_manifest_version else None,
ref=str(ref) if ref else None,
)
except Exception:
return None
async def list_installed_repos(self) -> list[InstalledRepo]:
data = await self._load()
installed = data.get("installed_repos", {})
out: list[InstalledRepo] = []
if not isinstance(installed, dict):
return out
for rid in list(installed.keys()):
item = await self.get_installed_repo(str(rid))
if item:
out.append(item)
return out
async def set_installed_repo(
self,
*,
repo_id: str,
url: str,
domains: list[str],
installed_version: str | None = None,
installed_manifest_version: str | None = None,
ref: str | None = None,
install_type: str = "integration",
installed_paths: list[str] | None = None,
) -> None:
data = await self._load()
installed = data.get("installed_repos", {})
if not isinstance(installed, dict):
installed = {}
data["installed_repos"] = installed
installed[str(repo_id)] = {
"repo_id": str(repo_id),
"url": str(url),
"domains": [str(d) for d in (domains or []) if str(d).strip()],
"install_type": str(install_type or "integration"),
"installed_paths": [str(p) for p in (installed_paths or []) if str(p).strip()],
"installed_at": int(time.time()),
# IMPORTANT: this is what BCS uses as "installed version" (ref/tag/branch)
"installed_version": installed_version,
# informational only
"installed_manifest_version": installed_manifest_version,
# keep ref too (debug/backward compatibility)
"ref": ref,
}
await self._save(data)
async def remove_installed_repo(self, repo_id: str) -> None:
data = await self._load()
installed = data.get("installed_repos", {})
if isinstance(installed, dict) and repo_id in installed:
installed.pop(repo_id, None)
data["installed_repos"] = installed
await self._save(data)