167 lines
5.4 KiB
Python
167 lines
5.4 KiB
Python
from __future__ import annotations
|
|
|
|
import time
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.storage import Store
|
|
|
|
_STORAGE_VERSION = 1
|
|
_STORAGE_KEY = "bcs_store"
|
|
|
|
|
|
@dataclass
|
|
class CustomRepo:
|
|
id: str
|
|
url: str
|
|
name: str | None = None
|
|
|
|
|
|
@dataclass
|
|
class InstalledRepo:
|
|
repo_id: str
|
|
url: str
|
|
domains: list[str]
|
|
installed_at: int
|
|
installed_version: str | None = None
|
|
ref: str | None = None
|
|
|
|
|
|
class BCSStorage:
|
|
"""Persistent storage for Bahmcloud Store.
|
|
|
|
Keys:
|
|
- custom_repos: list of manually added repositories
|
|
- installed_repos: mapping repo_id -> installed metadata
|
|
"""
|
|
|
|
def __init__(self, hass: HomeAssistant) -> None:
|
|
self.hass = hass
|
|
self._store: Store[dict[str, Any]] = Store(hass, _STORAGE_VERSION, _STORAGE_KEY)
|
|
|
|
async def _load(self) -> dict[str, Any]:
|
|
data = await self._store.async_load() or {}
|
|
if not isinstance(data, dict):
|
|
data = {}
|
|
|
|
if "custom_repos" not in data or not isinstance(data.get("custom_repos"), list):
|
|
data["custom_repos"] = []
|
|
|
|
if "installed_repos" not in data or not isinstance(data.get("installed_repos"), dict):
|
|
data["installed_repos"] = {}
|
|
|
|
return data
|
|
|
|
async def _save(self, data: dict[str, Any]) -> None:
|
|
await self._store.async_save(data)
|
|
|
|
async def list_custom_repos(self) -> list[CustomRepo]:
|
|
data = await self._load()
|
|
repos = data.get("custom_repos", [])
|
|
out: list[CustomRepo] = []
|
|
for r in repos:
|
|
if not isinstance(r, dict):
|
|
continue
|
|
rid = r.get("id")
|
|
url = r.get("url")
|
|
if not rid or not url:
|
|
continue
|
|
out.append(CustomRepo(id=str(rid), url=str(url), name=r.get("name")))
|
|
return out
|
|
|
|
async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo:
|
|
data = await self._load()
|
|
repos = data.get("custom_repos", [])
|
|
|
|
# De-duplicate by URL
|
|
for r in repos:
|
|
if isinstance(r, dict) and str(r.get("url") or "").strip() == url.strip():
|
|
return CustomRepo(id=str(r["id"]), url=str(r["url"]), name=r.get("name"))
|
|
|
|
rid = f"custom:{uuid.uuid4().hex[:10]}"
|
|
entry = {"id": rid, "url": url.strip(), "name": name.strip() if name else None}
|
|
repos.append(entry)
|
|
data["custom_repos"] = repos
|
|
await self._save(data)
|
|
return CustomRepo(id=rid, url=entry["url"], name=entry["name"])
|
|
|
|
async def remove_custom_repo(self, repo_id: str) -> None:
|
|
data = await self._load()
|
|
repos = data.get("custom_repos", [])
|
|
data["custom_repos"] = [
|
|
r for r in repos if not (isinstance(r, dict) and r.get("id") == repo_id)
|
|
]
|
|
await self._save(data)
|
|
|
|
async def get_installed_repo(self, repo_id: str) -> InstalledRepo | None:
|
|
data = await self._load()
|
|
installed = data.get("installed_repos", {})
|
|
if not isinstance(installed, dict):
|
|
return None
|
|
entry = installed.get(repo_id)
|
|
if not isinstance(entry, dict):
|
|
return None
|
|
|
|
try:
|
|
domains = entry.get("domains") or []
|
|
if not isinstance(domains, list):
|
|
domains = []
|
|
domains = [str(d) for d in domains if str(d).strip()]
|
|
|
|
return InstalledRepo(
|
|
repo_id=str(entry.get("repo_id") or repo_id),
|
|
url=str(entry.get("url") or ""),
|
|
domains=domains,
|
|
installed_at=int(entry.get("installed_at") or 0),
|
|
installed_version=str(entry.get("installed_version")) if entry.get("installed_version") else None,
|
|
ref=str(entry.get("ref")) if entry.get("ref") else None,
|
|
)
|
|
except Exception:
|
|
return None
|
|
|
|
async def list_installed_repos(self) -> list[InstalledRepo]:
|
|
data = await self._load()
|
|
installed = data.get("installed_repos", {})
|
|
out: list[InstalledRepo] = []
|
|
if not isinstance(installed, dict):
|
|
return out
|
|
for repo_id in list(installed.keys()):
|
|
item = await self.get_installed_repo(str(repo_id))
|
|
if item:
|
|
out.append(item)
|
|
return out
|
|
|
|
async def set_installed_repo(
|
|
self,
|
|
*,
|
|
repo_id: str,
|
|
url: str,
|
|
domains: list[str],
|
|
installed_version: str | None,
|
|
ref: str | None,
|
|
) -> None:
|
|
data = await self._load()
|
|
installed = data.get("installed_repos", {})
|
|
if not isinstance(installed, dict):
|
|
installed = {}
|
|
data["installed_repos"] = installed
|
|
|
|
installed[str(repo_id)] = {
|
|
"repo_id": str(repo_id),
|
|
"url": str(url),
|
|
"domains": [str(d) for d in (domains or []) if str(d).strip()],
|
|
"installed_at": int(time.time()),
|
|
"installed_version": installed_version,
|
|
"ref": ref,
|
|
}
|
|
await self._save(data)
|
|
|
|
async def remove_installed_repo(self, repo_id: str) -> None:
|
|
data = await self._load()
|
|
installed = data.get("installed_repos", {})
|
|
if isinstance(installed, dict) and repo_id in installed:
|
|
installed.pop(repo_id, None)
|
|
data["installed_repos"] = installed
|
|
await self._save(data) |