from __future__ import annotations import asyncio import json import logging import os import shutil import tempfile import zipfile from dataclasses import dataclass from datetime import timedelta from pathlib import Path from typing import Any from homeassistant.core import HomeAssistant from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.components import persistent_notification _LOGGER = logging.getLogger(__name__) DOMAIN = "bahmcloud_store" class StoreError(Exception): pass @dataclass class StoreConfig: store_url: str auto_update: bool check_interval: timedelta @dataclass class Package: id: str name: str type: str domain: str version: str zip_url: str source_path: str class BahmcloudStore: def __init__(self, hass: HomeAssistant, config: StoreConfig) -> None: self.hass = hass self.config = config self._packages: dict[str, Package] = {} self._last_error: str | None = None self._base = Path(__file__).resolve().parent self._panel_dir = self._base / "panel" def load_panel_file(self, filename: str) -> str: p = self._panel_dir / filename return p.read_text(encoding="utf-8") async def refresh(self) -> None: """Fetch store.json and parse packages.""" session = async_get_clientsession(self.hass) try: async with session.get(self.config.store_url, timeout=20) as resp: if resp.status != 200: raise StoreError(f"store_url returned {resp.status}") raw = await resp.text() except Exception as e: raise StoreError(f"Failed fetching store index: {e}") from e try: data = json.loads(raw) packages = data.get("packages", []) parsed: dict[str, Package] = {} for p in packages: pkg = Package( id=p["id"], name=p.get("name", p["id"]), type=p.get("type", "integration"), domain=p["domain"], version=str(p.get("version", "0.0.0")), zip_url=p["zip_url"], source_path=p["source_path"], ) parsed[pkg.id] = pkg self._packages = parsed self._last_error = None except Exception as e: raise StoreError(f"Invalid store.json: {e}") from e def installed_version(self, domain: str) -> str | None: manifest = Path(self.hass.config.path("custom_components", domain, "manifest.json")) if not manifest.exists(): return None try: data = json.loads(manifest.read_text(encoding="utf-8")) return str(data.get("version") or data.get("manifest_version") or "unknown") except Exception: return "unknown" def is_installed(self, domain: str) -> bool: return Path(self.hass.config.path("custom_components", domain)).exists() def as_dict(self) -> dict[str, Any]: items = [] for pkg in self._packages.values(): installed = self.is_installed(pkg.domain) inst_ver = self.installed_version(pkg.domain) if installed else None items.append( { "id": pkg.id, "name": pkg.name, "type": pkg.type, "domain": pkg.domain, "latest_version": pkg.version, "installed": installed, "installed_version": inst_ver, } ) return { "store_url": self.config.store_url, "auto_update": self.config.auto_update, "packages": items, "last_error": self._last_error, } async def install(self, package_id: str) -> None: pkg = self._packages.get(package_id) if not pkg: raise StoreError(f"Unknown package_id: {package_id}") await self._install_from_zip(pkg, mode="install") async def update(self, package_id: str) -> None: pkg = self._packages.get(package_id) if not pkg: raise StoreError(f"Unknown package_id: {package_id}") await self._install_from_zip(pkg, mode="update") async def update_all(self) -> None: for pkg in self._packages.values(): if not self.is_installed(pkg.domain): continue inst = self.installed_version(pkg.domain) or "0.0.0" # simple compare: if different => update (MVP) if inst != pkg.version: _LOGGER.info("Updating %s (%s -> %s)", pkg.id, inst, pkg.version) await self._install_from_zip(pkg, mode="auto-update") async def _install_from_zip(self, pkg: Package, mode: str) -> None: """Download ZIP, extract source_path, copy to custom_components/domain.""" session = async_get_clientsession(self.hass) with tempfile.TemporaryDirectory() as td: zip_path = Path(td) / "repo.zip" extract_dir = Path(td) / "extract" # Download try: async with session.get(pkg.zip_url, timeout=60) as resp: if resp.status != 200: raise StoreError(f"zip_url returned {resp.status}") zip_path.write_bytes(await resp.read()) except Exception as e: raise StoreError(f"Failed downloading ZIP: {e}") from e # Extract (in executor) try: await self.hass.async_add_executor_job(self._extract_zip, zip_path, extract_dir) except Exception as e: raise StoreError(f"Failed extracting ZIP: {e}") from e # Gitea ZIP hat meist einen Top-Level Ordner (repo-/...) # source_path muss innerhalb dieses Ordners liegen -> wir suchen es robust. src = self._find_source_path(extract_dir, pkg.source_path) if not src: raise StoreError(f"source_path not found in zip: {pkg.source_path}") target = Path(self.hass.config.path("custom_components", pkg.domain)) # Replace target atomically-ish tmp_target = Path(td) / "target_new" shutil.copytree(src, tmp_target) # remove old if target.exists(): shutil.rmtree(target) shutil.copytree(tmp_target, target) persistent_notification.async_create( self.hass, ( f"**{pkg.name}** wurde {mode} installiert/aktualisiert.\n\n" "Bitte Home Assistant **neu starten**, damit die Änderungen aktiv werden." ), title="Bahmcloud Store", notification_id=f"{DOMAIN}_{pkg.domain}_restart_required", ) @staticmethod def _extract_zip(zip_path: Path, extract_dir: Path) -> None: extract_dir.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(zip_path, "r") as zf: zf.extractall(extract_dir) @staticmethod def _find_source_path(extract_root: Path, source_path: str) -> Path | None: # Try direct: direct = extract_root / source_path if direct.exists(): return direct # Search one level down (typical archive root folder) for child in extract_root.iterdir(): candidate = child / source_path if candidate.exists(): return candidate # Fallback: walk parts = Path(source_path).parts for p in extract_root.rglob(parts[-1]): # crude: ensure endswith source_path if str(p).replace("\\", "/").endswith(source_path): return p return None