From 2c50765d6640ba969b92ecf6b34f16b4a25146a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Bachmann?= Date: Thu, 15 Jan 2026 14:31:58 +0000 Subject: [PATCH] custom_components/bahmcloud_store/store.py aktualisiert --- custom_components/bahmcloud_store/store.py | 360 +++------------------ 1 file changed, 37 insertions(+), 323 deletions(-) diff --git a/custom_components/bahmcloud_store/store.py b/custom_components/bahmcloud_store/store.py index b7d41d5..3152d30 100644 --- a/custom_components/bahmcloud_store/store.py +++ b/custom_components/bahmcloud_store/store.py @@ -1,338 +1,52 @@ from __future__ import annotations -import json -import logging -import shutil -import tempfile -import zipfile from dataclasses import dataclass -from pathlib import Path -from typing import Any -from urllib.parse import urlparse - -from aiohttp import web from homeassistant.core import HomeAssistant -from homeassistant.helpers.aiohttp_client import async_get_clientsession -from homeassistant.components import persistent_notification -from homeassistant.util import yaml as ha_yaml -from homeassistant.components.http import HomeAssistantView +from homeassistant.helpers.storage import Store -_LOGGER = logging.getLogger(__name__) - -DOMAIN = "bahmcloud_store" - - -class StoreError(Exception): - """Store error.""" +STORAGE_KEY = "bahmcloud_store" +STORAGE_VERSION = 1 @dataclass -class StoreConfig: - store_url: str - - -@dataclass -class Package: +class CustomRepo: id: str - name: str - type: str # "integration" | "store" - domain: str - repo: str - owner: str - repository: str - branch: str - source_path: str - - # computed each refresh - latest_version: str | None = None - zip_url: str | None = None - release_url: str | None = None + url: str + name: str | None = None -class BahmcloudStore: - def __init__(self, hass: HomeAssistant, config: StoreConfig) -> None: - self.hass = hass - self.config = config - self.packages: dict[str, Package] = {} - self.refresh_seconds: int = 300 - self._listeners: list[callable] = [] +class BCSStorage: + def __init__(self, hass: HomeAssistant) -> None: + self._store = Store(hass, STORAGE_VERSION, STORAGE_KEY) - def add_listener(self, cb) -> None: - self._listeners.append(cb) + async def list_custom_repos(self) -> list[CustomRepo]: + data = await self._store.async_load() + if not isinstance(data, dict): + return [] + items = data.get("custom_repos", []) + if not isinstance(items, list): + return [] - def signal_entities_updated(self) -> None: - for cb in list(self._listeners): - try: - cb() - except Exception: - pass + out: list[CustomRepo] = [] + for it in items: + if not isinstance(it, dict): + continue + rid = str(it.get("id") or "").strip() + url = str(it.get("url") or "").strip() + name = it.get("name") + name = str(name).strip() if isinstance(name, str) and name.strip() else None + if rid and url: + out.append(CustomRepo(id=rid, url=url, name=name)) + return out - @staticmethod - def _zip_url(repo: str, branch: str) -> str: - return f"{repo.rstrip('/')}/archive/{branch}.zip" + async def add_custom_repo(self, rid: str, url: str, name: str | None) -> None: + repos = await self.list_custom_repos() + if any(r.id == rid for r in repos): + return + repos.append(CustomRepo(id=rid, url=url, name=name)) + await self._store.async_save({"custom_repos": [r.__dict__ for r in repos]}) - @staticmethod - def _base_from_repo(repo_url: str) -> str: - u = urlparse(repo_url.rstrip("/")) - return f"{u.scheme}://{u.netloc}" - - @staticmethod - def _raw_manifest_url(repo: str, branch: str, source_path: str) -> str: - # Example: - # https://git.bahmcloud.de/bahmcloud/easy_proxmox/raw/branch/main/custom_components/easy_proxmox/manifest.json - return f"{repo.rstrip('/')}/raw/branch/{branch}/{source_path.rstrip('/')}/manifest.json" - - async def _fetch_latest_version(self, pkg: Package) -> tuple[str | None, str | None]: - """ - Returns (latest_version, release_url) - Strategy: - 1) releases/latest -> tag_name - 2) tags?limit=1 -> first tag name - 3) fallback: read manifest.json from repo (version field) - """ - session = async_get_clientsession(self.hass) - base = self._base_from_repo(pkg.repo) - - # 1) latest release - latest_release_api = f"{base}/api/v1/repos/{pkg.owner}/{pkg.repository}/releases/latest" - try: - async with session.get(latest_release_api, timeout=20) as resp: - if resp.status == 200: - data = await resp.json() - tag = data.get("tag_name") - html_url = data.get("html_url") - if tag: - return (str(tag), str(html_url) if html_url else None) - except Exception: - pass - - # 2) tags fallback - tags_api = f"{base}/api/v1/repos/{pkg.owner}/{pkg.repository}/tags?limit=1" - try: - async with session.get(tags_api, timeout=20) as resp: - if resp.status == 200: - tags = await resp.json() - if tags and isinstance(tags, list): - name = tags[0].get("name") - if name: - return (str(name), None) - except Exception: - pass - - # 3) fallback: manifest.json version from repo - try: - manifest_url = self._raw_manifest_url(pkg.repo, pkg.branch, pkg.source_path) - async with session.get(manifest_url, timeout=20) as resp: - if resp.status == 200: - text = await resp.text() - data = json.loads(text) - ver = data.get("version") - if ver: - return (str(ver), None) - except Exception: - pass - - return (None, None) - - async def refresh(self) -> None: - session = async_get_clientsession(self.hass) - - try: - async with session.get(self.config.store_url, timeout=20) as resp: - if resp.status != 200: - raise StoreError(f"store_url returned {resp.status}") - raw = await resp.text() - except Exception as e: - raise StoreError(f"Failed fetching store index: {e}") from e - - try: - data = ha_yaml.parse_yaml(raw) - if not isinstance(data, dict): - raise StoreError("store.yaml must be a mapping") - - self.refresh_seconds = int(data.get("refresh_seconds", 300)) - pkgs = data.get("packages", []) - parsed: dict[str, Package] = {} - - for p in pkgs: - pkg = Package( - id=p["id"], - name=p.get("name", p["id"]), - type=p.get("type", "integration"), - domain=p["domain"], - repo=p["repo"], - owner=p["owner"], - repository=p["repository"], - branch=p.get("branch", "main"), - source_path=p["source_path"], - ) - pkg.zip_url = self._zip_url(pkg.repo, pkg.branch) - parsed[pkg.id] = pkg - - # compute latest versions - for pkg in parsed.values(): - latest, rel_url = await self._fetch_latest_version(pkg) - pkg.latest_version = latest or "unknown" - pkg.release_url = rel_url - - self.packages = parsed - - except Exception as e: - raise StoreError(f"Invalid store.yaml: {e}") from e - - def installed_version(self, domain: str) -> str | None: - manifest = Path(self.hass.config.path("custom_components", domain, "manifest.json")) - if not manifest.exists(): - return None - try: - data = json.loads(manifest.read_text(encoding="utf-8")) - return str(data.get("version") or "unknown") - except Exception: - return "unknown" - - def is_installed(self, domain: str) -> bool: - return Path(self.hass.config.path("custom_components", domain)).exists() - - async def install_from_zip(self, pkg: Package) -> None: - """Manual install/update: download ZIP and copy source_path into /config/custom_components/.""" - if not pkg.zip_url: - raise StoreError("zip_url not set") - - session = async_get_clientsession(self.hass) - - with tempfile.TemporaryDirectory() as td: - zip_path = Path(td) / "repo.zip" - extract_dir = Path(td) / "extract" - - async with session.get(pkg.zip_url, timeout=60) as resp: - if resp.status != 200: - raise StoreError(f"zip_url returned {resp.status}") - zip_path.write_bytes(await resp.read()) - - await self.hass.async_add_executor_job(self._extract_zip, zip_path, extract_dir) - - src = self._find_source_path(extract_dir, pkg.source_path) - if not src: - raise StoreError(f"source_path not found in zip: {pkg.source_path}") - - target = Path(self.hass.config.path("custom_components", pkg.domain)) - if target.exists(): - shutil.rmtree(target) - shutil.copytree(src, target) - - # Nach Installation: Entities neu aufbauen (damit es als Update auftaucht) - self.signal_entities_updated() - - persistent_notification.async_create( - self.hass, - ( - f"**{pkg.name}** wurde installiert/aktualisiert.\n\n" - "Bitte Home Assistant **neu starten**, damit die Änderungen aktiv werden." - ), - title="Bahmcloud Store", - notification_id=f"{DOMAIN}_{pkg.domain}_restart_required", - ) - - @staticmethod - def _extract_zip(zip_path: Path, extract_dir: Path) -> None: - extract_dir.mkdir(parents=True, exist_ok=True) - with zipfile.ZipFile(zip_path, "r") as zf: - zf.extractall(extract_dir) - - @staticmethod - def _find_source_path(extract_root: Path, source_path: str) -> Path | None: - direct = extract_root / source_path - if direct.exists(): - return direct - for child in extract_root.iterdir(): - candidate = child / source_path - if candidate.exists(): - return candidate - return None - - async def register_http_views(self) -> None: - """Register HTTP views for static panel assets and JSON API.""" - self.hass.http.register_view(_StaticView()) - self.hass.http.register_view(_APIView(self)) - - -class _StaticView(HomeAssistantView): - """ - IMPORTANT: - Custom Panel JS modules are loaded WITHOUT Authorization headers. - Therefore static panel assets must be publicly accessible (no auth). - """ - requires_auth = False - name = "bahmcloud_store_static" - url = "/api/bahmcloud_store_static/{path:.*}" - - async def get(self, request, path): - base = Path(__file__).resolve().parent / "panel" - if not path: - path = "index.html" - - f = (base / path).resolve() - - if not str(f).startswith(str(base)) or not f.exists() or not f.is_file(): - return web.Response(status=404, text="Not found") - - suffix = f.suffix.lower() - if suffix == ".js": - return web.Response(body=f.read_bytes(), content_type="application/javascript") - if suffix == ".css": - return web.Response(body=f.read_bytes(), content_type="text/css") - if suffix in (".html", ".htm"): - return web.Response(body=f.read_bytes(), content_type="text/html") - - return web.Response(body=f.read_bytes(), content_type="application/octet-stream") - - -class _APIView(HomeAssistantView): - """ - Auth-protected API: - GET /api/bahmcloud_store -> list packages - POST /api/bahmcloud_store {op:...} -> install/update a package - """ - requires_auth = True - name = "bahmcloud_store_api" - url = "/api/bahmcloud_store" - - def __init__(self, store: BahmcloudStore) -> None: - self.store = store - - async def get(self, request): - await self.store.refresh() - items: list[dict[str, Any]] = [] - for pkg in self.store.packages.values(): - installed = self.store.is_installed(pkg.domain) - items.append( - { - "id": pkg.id, - "name": pkg.name, - "domain": pkg.domain, - "type": pkg.type, - "installed": installed, - "installed_version": self.store.installed_version(pkg.domain) if installed else None, - "latest_version": pkg.latest_version, - "repo": pkg.repo, - "release_url": pkg.release_url, - } - ) - return self.json({"packages": items, "store_url": self.store.config.store_url}) - - async def post(self, request): - data = await request.json() - op = data.get("op") - package_id = data.get("package_id") - - if op not in ("install", "update"): - return self.json({"error": "unknown op"}, status_code=400) - if not package_id: - return self.json({"error": "package_id missing"}, status_code=400) - - pkg = self.store.packages.get(package_id) - if not pkg: - return self.json({"error": "unknown package_id"}, status_code=404) - - await self.store.install_from_zip(pkg) - return self.json({"ok": True}) + async def remove_custom_repo(self, rid: str) -> None: + repos = await self.list_custom_repos() + repos = [r for r in repos if r.id != rid] + await self._store.async_save({"custom_repos": [r.__dict__ for r in repos]})