diff --git a/custom_components/bahmcloud_store/store.py b/custom_components/bahmcloud_store/store.py deleted file mode 100644 index 1ed2eec..0000000 --- a/custom_components/bahmcloud_store/store.py +++ /dev/null @@ -1,227 +0,0 @@ -from __future__ import annotations - -import asyncio -import json -import logging -import os -import shutil -import tempfile -import zipfile -from dataclasses import dataclass -from datetime import timedelta -from pathlib import Path -from typing import Any - -from homeassistant.core import HomeAssistant -from homeassistant.helpers.aiohttp_client import async_get_clientsession -from homeassistant.components import persistent_notification - -_LOGGER = logging.getLogger(__name__) - -DOMAIN = "bahmcloud_store" - - -class StoreError(Exception): - pass - - -@dataclass -class StoreConfig: - store_url: str - auto_update: bool - check_interval: timedelta - - -@dataclass -class Package: - id: str - name: str - type: str - domain: str - version: str - zip_url: str - source_path: str - - -class BahmcloudStore: - def __init__(self, hass: HomeAssistant, config: StoreConfig) -> None: - self.hass = hass - self.config = config - self._packages: dict[str, Package] = {} - self._last_error: str | None = None - - self._base = Path(__file__).resolve().parent - self._panel_dir = self._base / "panel" - - def load_panel_file(self, filename: str) -> str: - p = self._panel_dir / filename - return p.read_text(encoding="utf-8") - - async def refresh(self) -> None: - """Fetch store.json and parse packages.""" - session = async_get_clientsession(self.hass) - try: - async with session.get(self.config.store_url, timeout=20) as resp: - if resp.status != 200: - raise StoreError(f"store_url returned {resp.status}") - raw = await resp.text() - except Exception as e: - raise StoreError(f"Failed fetching store index: {e}") from e - - try: - data = json.loads(raw) - packages = data.get("packages", []) - parsed: dict[str, Package] = {} - for p in packages: - pkg = Package( - id=p["id"], - name=p.get("name", p["id"]), - type=p.get("type", "integration"), - domain=p["domain"], - version=str(p.get("version", "0.0.0")), - zip_url=p["zip_url"], - source_path=p["source_path"], - ) - parsed[pkg.id] = pkg - self._packages = parsed - self._last_error = None - except Exception as e: - raise StoreError(f"Invalid store.json: {e}") from e - - def installed_version(self, domain: str) -> str | None: - manifest = Path(self.hass.config.path("custom_components", domain, "manifest.json")) - if not manifest.exists(): - return None - try: - data = json.loads(manifest.read_text(encoding="utf-8")) - return str(data.get("version") or data.get("manifest_version") or "unknown") - except Exception: - return "unknown" - - def is_installed(self, domain: str) -> bool: - return Path(self.hass.config.path("custom_components", domain)).exists() - - def as_dict(self) -> dict[str, Any]: - items = [] - for pkg in self._packages.values(): - installed = self.is_installed(pkg.domain) - inst_ver = self.installed_version(pkg.domain) if installed else None - items.append( - { - "id": pkg.id, - "name": pkg.name, - "type": pkg.type, - "domain": pkg.domain, - "latest_version": pkg.version, - "installed": installed, - "installed_version": inst_ver, - } - ) - return { - "store_url": self.config.store_url, - "auto_update": self.config.auto_update, - "packages": items, - "last_error": self._last_error, - } - - async def install(self, package_id: str) -> None: - pkg = self._packages.get(package_id) - if not pkg: - raise StoreError(f"Unknown package_id: {package_id}") - await self._install_from_zip(pkg, mode="install") - - async def update(self, package_id: str) -> None: - pkg = self._packages.get(package_id) - if not pkg: - raise StoreError(f"Unknown package_id: {package_id}") - await self._install_from_zip(pkg, mode="update") - - async def update_all(self) -> None: - for pkg in self._packages.values(): - if not self.is_installed(pkg.domain): - continue - - inst = self.installed_version(pkg.domain) or "0.0.0" - # simple compare: if different => update (MVP) - if inst != pkg.version: - _LOGGER.info("Updating %s (%s -> %s)", pkg.id, inst, pkg.version) - await self._install_from_zip(pkg, mode="auto-update") - - async def _install_from_zip(self, pkg: Package, mode: str) -> None: - """Download ZIP, extract source_path, copy to custom_components/domain.""" - session = async_get_clientsession(self.hass) - - with tempfile.TemporaryDirectory() as td: - zip_path = Path(td) / "repo.zip" - extract_dir = Path(td) / "extract" - - # Download - try: - async with session.get(pkg.zip_url, timeout=60) as resp: - if resp.status != 200: - raise StoreError(f"zip_url returned {resp.status}") - zip_path.write_bytes(await resp.read()) - except Exception as e: - raise StoreError(f"Failed downloading ZIP: {e}") from e - - # Extract (in executor) - try: - await self.hass.async_add_executor_job(self._extract_zip, zip_path, extract_dir) - except Exception as e: - raise StoreError(f"Failed extracting ZIP: {e}") from e - - # Gitea ZIP hat meist einen Top-Level Ordner (repo-/...) - # source_path muss innerhalb dieses Ordners liegen -> wir suchen es robust. - src = self._find_source_path(extract_dir, pkg.source_path) - if not src: - raise StoreError(f"source_path not found in zip: {pkg.source_path}") - - target = Path(self.hass.config.path("custom_components", pkg.domain)) - - # Replace target atomically-ish - tmp_target = Path(td) / "target_new" - shutil.copytree(src, tmp_target) - - # remove old - if target.exists(): - shutil.rmtree(target) - - shutil.copytree(tmp_target, target) - - persistent_notification.async_create( - self.hass, - ( - f"**{pkg.name}** wurde {mode} installiert/aktualisiert.\n\n" - "Bitte Home Assistant **neu starten**, damit die Änderungen aktiv werden." - ), - title="Bahmcloud Store", - notification_id=f"{DOMAIN}_{pkg.domain}_restart_required", - ) - - @staticmethod - def _extract_zip(zip_path: Path, extract_dir: Path) -> None: - extract_dir.mkdir(parents=True, exist_ok=True) - with zipfile.ZipFile(zip_path, "r") as zf: - zf.extractall(extract_dir) - - @staticmethod - def _find_source_path(extract_root: Path, source_path: str) -> Path | None: - # Try direct: - direct = extract_root / source_path - if direct.exists(): - return direct - - # Search one level down (typical archive root folder) - for child in extract_root.iterdir(): - candidate = child / source_path - if candidate.exists(): - return candidate - - # Fallback: walk - parts = Path(source_path).parts - for p in extract_root.rglob(parts[-1]): - # crude: ensure endswith source_path - if str(p).replace("\\", "/").endswith(source_path): - return p - - return None