from __future__ import annotations import json import logging import shutil import tempfile import zipfile from dataclasses import dataclass from pathlib import Path from typing import Any from urllib.parse import urlparse from aiohttp import web from homeassistant.core import HomeAssistant from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.components import persistent_notification from homeassistant.util import yaml as ha_yaml from homeassistant.components.http import HomeAssistantView _LOGGER = logging.getLogger(__name__) DOMAIN = "bahmcloud_store" class StoreError(Exception): """Store error.""" @dataclass class StoreConfig: store_url: str @dataclass class Package: id: str name: str type: str # "integration" | "store" domain: str repo: str owner: str repository: str branch: str source_path: str # computed each refresh latest_version: str | None = None zip_url: str | None = None release_url: str | None = None class BahmcloudStore: def __init__(self, hass: HomeAssistant, config: StoreConfig) -> None: self.hass = hass self.config = config self.packages: dict[str, Package] = {} self.refresh_seconds: int = 300 self._listeners: list[callable] = [] def add_listener(self, cb) -> None: self._listeners.append(cb) def signal_entities_updated(self) -> None: for cb in list(self._listeners): try: cb() except Exception: pass @staticmethod def _zip_url(repo: str, branch: str) -> str: return f"{repo.rstrip('/')}/archive/{branch}.zip" @staticmethod def _base_from_repo(repo_url: str) -> str: u = urlparse(repo_url.rstrip("/")) return f"{u.scheme}://{u.netloc}" @staticmethod def _raw_manifest_url(repo: str, branch: str, source_path: str) -> str: # Example: # https://git.bahmcloud.de/bahmcloud/easy_proxmox/raw/branch/main/custom_components/easy_proxmox/manifest.json return f"{repo.rstrip('/')}/raw/branch/{branch}/{source_path.rstrip('/')}/manifest.json" async def _fetch_latest_version(self, pkg: Package) -> tuple[str | None, str | None]: """ Returns (latest_version, release_url) Strategy: 1) releases/latest -> tag_name 2) tags?limit=1 -> first tag name 3) fallback: read manifest.json from repo (version field) """ session = async_get_clientsession(self.hass) base = self._base_from_repo(pkg.repo) # 1) latest release latest_release_api = f"{base}/api/v1/repos/{pkg.owner}/{pkg.repository}/releases/latest" try: async with session.get(latest_release_api, timeout=20) as resp: if resp.status == 200: data = await resp.json() tag = data.get("tag_name") html_url = data.get("html_url") if tag: return (str(tag), str(html_url) if html_url else None) except Exception: pass # 2) tags fallback tags_api = f"{base}/api/v1/repos/{pkg.owner}/{pkg.repository}/tags?limit=1" try: async with session.get(tags_api, timeout=20) as resp: if resp.status == 200: tags = await resp.json() if tags and isinstance(tags, list): name = tags[0].get("name") if name: return (str(name), None) except Exception: pass # 3) fallback: manifest.json version from repo try: manifest_url = self._raw_manifest_url(pkg.repo, pkg.branch, pkg.source_path) async with session.get(manifest_url, timeout=20) as resp: if resp.status == 200: text = await resp.text() data = json.loads(text) ver = data.get("version") if ver: return (str(ver), None) except Exception: pass return (None, None) async def refresh(self) -> None: session = async_get_clientsession(self.hass) try: async with session.get(self.config.store_url, timeout=20) as resp: if resp.status != 200: raise StoreError(f"store_url returned {resp.status}") raw = await resp.text() except Exception as e: raise StoreError(f"Failed fetching store index: {e}") from e try: data = ha_yaml.parse_yaml(raw) if not isinstance(data, dict): raise StoreError("store.yaml must be a mapping") self.refresh_seconds = int(data.get("refresh_seconds", 300)) pkgs = data.get("packages", []) parsed: dict[str, Package] = {} for p in pkgs: pkg = Package( id=p["id"], name=p.get("name", p["id"]), type=p.get("type", "integration"), domain=p["domain"], repo=p["repo"], owner=p["owner"], repository=p["repository"], branch=p.get("branch", "main"), source_path=p["source_path"], ) pkg.zip_url = self._zip_url(pkg.repo, pkg.branch) parsed[pkg.id] = pkg # compute latest versions for pkg in parsed.values(): latest, rel_url = await self._fetch_latest_version(pkg) pkg.latest_version = latest or "unknown" pkg.release_url = rel_url self.packages = parsed except Exception as e: raise StoreError(f"Invalid store.yaml: {e}") from e def installed_version(self, domain: str) -> str | None: manifest = Path(self.hass.config.path("custom_components", domain, "manifest.json")) if not manifest.exists(): return None try: data = json.loads(manifest.read_text(encoding="utf-8")) return str(data.get("version") or "unknown") except Exception: return "unknown" def is_installed(self, domain: str) -> bool: return Path(self.hass.config.path("custom_components", domain)).exists() async def install_from_zip(self, pkg: Package) -> None: """Manual install/update: download ZIP and copy source_path into /config/custom_components/.""" if not pkg.zip_url: raise StoreError("zip_url not set") session = async_get_clientsession(self.hass) with tempfile.TemporaryDirectory() as td: zip_path = Path(td) / "repo.zip" extract_dir = Path(td) / "extract" async with session.get(pkg.zip_url, timeout=60) as resp: if resp.status != 200: raise StoreError(f"zip_url returned {resp.status}") zip_path.write_bytes(await resp.read()) await self.hass.async_add_executor_job(self._extract_zip, zip_path, extract_dir) src = self._find_source_path(extract_dir, pkg.source_path) if not src: raise StoreError(f"source_path not found in zip: {pkg.source_path}") target = Path(self.hass.config.path("custom_components", pkg.domain)) if target.exists(): shutil.rmtree(target) shutil.copytree(src, target) # Nach Installation: Entities neu aufbauen (damit es als Update auftaucht) self.signal_entities_updated() persistent_notification.async_create( self.hass, ( f"**{pkg.name}** wurde installiert/aktualisiert.\n\n" "Bitte Home Assistant **neu starten**, damit die Änderungen aktiv werden." ), title="Bahmcloud Store", notification_id=f"{DOMAIN}_{pkg.domain}_restart_required", ) @staticmethod def _extract_zip(zip_path: Path, extract_dir: Path) -> None: extract_dir.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(zip_path, "r") as zf: zf.extractall(extract_dir) @staticmethod def _find_source_path(extract_root: Path, source_path: str) -> Path | None: direct = extract_root / source_path if direct.exists(): return direct for child in extract_root.iterdir(): candidate = child / source_path if candidate.exists(): return candidate return None async def register_http_views(self) -> None: """Register HTTP views for static panel assets and JSON API.""" self.hass.http.register_view(_StaticView()) self.hass.http.register_view(_APIView(self)) class _StaticView(HomeAssistantView): """ IMPORTANT: Custom Panel JS modules are loaded WITHOUT Authorization headers. Therefore static panel assets must be publicly accessible (no auth). """ requires_auth = False name = "bahmcloud_store_static" url = "/api/bahmcloud_store_static/{path:.*}" async def get(self, request, path): base = Path(__file__).resolve().parent / "panel" if not path: path = "index.html" f = (base / path).resolve() if not str(f).startswith(str(base)) or not f.exists() or not f.is_file(): return web.Response(status=404, text="Not found") suffix = f.suffix.lower() if suffix == ".js": return web.Response(body=f.read_bytes(), content_type="application/javascript") if suffix == ".css": return web.Response(body=f.read_bytes(), content_type="text/css") if suffix in (".html", ".htm"): return web.Response(body=f.read_bytes(), content_type="text/html") return web.Response(body=f.read_bytes(), content_type="application/octet-stream") class _APIView(HomeAssistantView): """ Auth-protected API: GET /api/bahmcloud_store -> list packages POST /api/bahmcloud_store {op:...} -> install/update a package """ requires_auth = True name = "bahmcloud_store_api" url = "/api/bahmcloud_store" def __init__(self, store: BahmcloudStore) -> None: self.store = store async def get(self, request): await self.store.refresh() items: list[dict[str, Any]] = [] for pkg in self.store.packages.values(): installed = self.store.is_installed(pkg.domain) items.append( { "id": pkg.id, "name": pkg.name, "domain": pkg.domain, "type": pkg.type, "installed": installed, "installed_version": self.store.installed_version(pkg.domain) if installed else None, "latest_version": pkg.latest_version, "repo": pkg.repo, "release_url": pkg.release_url, } ) return self.json({"packages": items, "store_url": self.store.config.store_url}) async def post(self, request): data = await request.json() op = data.get("op") package_id = data.get("package_id") if op not in ("install", "update"): return self.json({"error": "unknown op"}, status_code=400) if not package_id: return self.json({"error": "package_id missing"}, status_code=400) pkg = self.store.packages.get(package_id) if not pkg: return self.json({"error": "unknown package_id"}, status_code=404) await self.store.install_from_zip(pkg) return self.json({"ok": True})