From 49aab6952aa5838faca9daa46f94e8e7a9b248d8 Mon Sep 17 00:00:00 2001 From: bahmcloud Date: Wed, 14 Jan 2026 17:10:01 +0000 Subject: [PATCH] =?UTF-8?q?custom=5Fcomponents/bahmcloud=5Fstore/store.py?= =?UTF-8?q?=20hinzugef=C3=BCgt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- custom_components/bahmcloud_store/store.py | 227 +++++++++++++++++++++ 1 file changed, 227 insertions(+) create mode 100644 custom_components/bahmcloud_store/store.py diff --git a/custom_components/bahmcloud_store/store.py b/custom_components/bahmcloud_store/store.py new file mode 100644 index 0000000..1ed2eec --- /dev/null +++ b/custom_components/bahmcloud_store/store.py @@ -0,0 +1,227 @@ +from __future__ import annotations + +import asyncio +import json +import logging +import os +import shutil +import tempfile +import zipfile +from dataclasses import dataclass +from datetime import timedelta +from pathlib import Path +from typing import Any + +from homeassistant.core import HomeAssistant +from homeassistant.helpers.aiohttp_client import async_get_clientsession +from homeassistant.components import persistent_notification + +_LOGGER = logging.getLogger(__name__) + +DOMAIN = "bahmcloud_store" + + +class StoreError(Exception): + pass + + +@dataclass +class StoreConfig: + store_url: str + auto_update: bool + check_interval: timedelta + + +@dataclass +class Package: + id: str + name: str + type: str + domain: str + version: str + zip_url: str + source_path: str + + +class BahmcloudStore: + def __init__(self, hass: HomeAssistant, config: StoreConfig) -> None: + self.hass = hass + self.config = config + self._packages: dict[str, Package] = {} + self._last_error: str | None = None + + self._base = Path(__file__).resolve().parent + self._panel_dir = self._base / "panel" + + def load_panel_file(self, filename: str) -> str: + p = self._panel_dir / filename + return p.read_text(encoding="utf-8") + + async def refresh(self) -> None: + """Fetch store.json and parse packages.""" + session = async_get_clientsession(self.hass) + try: + async with session.get(self.config.store_url, timeout=20) as resp: + if resp.status != 200: + raise StoreError(f"store_url returned {resp.status}") + raw = await resp.text() + except Exception as e: + raise StoreError(f"Failed fetching store index: {e}") from e + + try: + data = json.loads(raw) + packages = data.get("packages", []) + parsed: dict[str, Package] = {} + for p in packages: + pkg = Package( + id=p["id"], + name=p.get("name", p["id"]), + type=p.get("type", "integration"), + domain=p["domain"], + version=str(p.get("version", "0.0.0")), + zip_url=p["zip_url"], + source_path=p["source_path"], + ) + parsed[pkg.id] = pkg + self._packages = parsed + self._last_error = None + except Exception as e: + raise StoreError(f"Invalid store.json: {e}") from e + + def installed_version(self, domain: str) -> str | None: + manifest = Path(self.hass.config.path("custom_components", domain, "manifest.json")) + if not manifest.exists(): + return None + try: + data = json.loads(manifest.read_text(encoding="utf-8")) + return str(data.get("version") or data.get("manifest_version") or "unknown") + except Exception: + return "unknown" + + def is_installed(self, domain: str) -> bool: + return Path(self.hass.config.path("custom_components", domain)).exists() + + def as_dict(self) -> dict[str, Any]: + items = [] + for pkg in self._packages.values(): + installed = self.is_installed(pkg.domain) + inst_ver = self.installed_version(pkg.domain) if installed else None + items.append( + { + "id": pkg.id, + "name": pkg.name, + "type": pkg.type, + "domain": pkg.domain, + "latest_version": pkg.version, + "installed": installed, + "installed_version": inst_ver, + } + ) + return { + "store_url": self.config.store_url, + "auto_update": self.config.auto_update, + "packages": items, + "last_error": self._last_error, + } + + async def install(self, package_id: str) -> None: + pkg = self._packages.get(package_id) + if not pkg: + raise StoreError(f"Unknown package_id: {package_id}") + await self._install_from_zip(pkg, mode="install") + + async def update(self, package_id: str) -> None: + pkg = self._packages.get(package_id) + if not pkg: + raise StoreError(f"Unknown package_id: {package_id}") + await self._install_from_zip(pkg, mode="update") + + async def update_all(self) -> None: + for pkg in self._packages.values(): + if not self.is_installed(pkg.domain): + continue + + inst = self.installed_version(pkg.domain) or "0.0.0" + # simple compare: if different => update (MVP) + if inst != pkg.version: + _LOGGER.info("Updating %s (%s -> %s)", pkg.id, inst, pkg.version) + await self._install_from_zip(pkg, mode="auto-update") + + async def _install_from_zip(self, pkg: Package, mode: str) -> None: + """Download ZIP, extract source_path, copy to custom_components/domain.""" + session = async_get_clientsession(self.hass) + + with tempfile.TemporaryDirectory() as td: + zip_path = Path(td) / "repo.zip" + extract_dir = Path(td) / "extract" + + # Download + try: + async with session.get(pkg.zip_url, timeout=60) as resp: + if resp.status != 200: + raise StoreError(f"zip_url returned {resp.status}") + zip_path.write_bytes(await resp.read()) + except Exception as e: + raise StoreError(f"Failed downloading ZIP: {e}") from e + + # Extract (in executor) + try: + await self.hass.async_add_executor_job(self._extract_zip, zip_path, extract_dir) + except Exception as e: + raise StoreError(f"Failed extracting ZIP: {e}") from e + + # Gitea ZIP hat meist einen Top-Level Ordner (repo-/...) + # source_path muss innerhalb dieses Ordners liegen -> wir suchen es robust. + src = self._find_source_path(extract_dir, pkg.source_path) + if not src: + raise StoreError(f"source_path not found in zip: {pkg.source_path}") + + target = Path(self.hass.config.path("custom_components", pkg.domain)) + + # Replace target atomically-ish + tmp_target = Path(td) / "target_new" + shutil.copytree(src, tmp_target) + + # remove old + if target.exists(): + shutil.rmtree(target) + + shutil.copytree(tmp_target, target) + + persistent_notification.async_create( + self.hass, + ( + f"**{pkg.name}** wurde {mode} installiert/aktualisiert.\n\n" + "Bitte Home Assistant **neu starten**, damit die Änderungen aktiv werden." + ), + title="Bahmcloud Store", + notification_id=f"{DOMAIN}_{pkg.domain}_restart_required", + ) + + @staticmethod + def _extract_zip(zip_path: Path, extract_dir: Path) -> None: + extract_dir.mkdir(parents=True, exist_ok=True) + with zipfile.ZipFile(zip_path, "r") as zf: + zf.extractall(extract_dir) + + @staticmethod + def _find_source_path(extract_root: Path, source_path: str) -> Path | None: + # Try direct: + direct = extract_root / source_path + if direct.exists(): + return direct + + # Search one level down (typical archive root folder) + for child in extract_root.iterdir(): + candidate = child / source_path + if candidate.exists(): + return candidate + + # Fallback: walk + parts = Path(source_path).parts + for p in extract_root.rglob(parts[-1]): + # crude: ensure endswith source_path + if str(p).replace("\\", "/").endswith(source_path): + return p + + return None