custom_components/bahmcloud_store/store.py gelöscht

This commit is contained in:
2026-01-14 17:30:49 +00:00
parent 0abaeba622
commit e6d0005483

View File

@@ -1,227 +0,0 @@
from __future__ import annotations
import asyncio
import json
import logging
import os
import shutil
import tempfile
import zipfile
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path
from typing import Any
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.components import persistent_notification
_LOGGER = logging.getLogger(__name__)
DOMAIN = "bahmcloud_store"
class StoreError(Exception):
pass
@dataclass
class StoreConfig:
store_url: str
auto_update: bool
check_interval: timedelta
@dataclass
class Package:
id: str
name: str
type: str
domain: str
version: str
zip_url: str
source_path: str
class BahmcloudStore:
def __init__(self, hass: HomeAssistant, config: StoreConfig) -> None:
self.hass = hass
self.config = config
self._packages: dict[str, Package] = {}
self._last_error: str | None = None
self._base = Path(__file__).resolve().parent
self._panel_dir = self._base / "panel"
def load_panel_file(self, filename: str) -> str:
p = self._panel_dir / filename
return p.read_text(encoding="utf-8")
async def refresh(self) -> None:
"""Fetch store.json and parse packages."""
session = async_get_clientsession(self.hass)
try:
async with session.get(self.config.store_url, timeout=20) as resp:
if resp.status != 200:
raise StoreError(f"store_url returned {resp.status}")
raw = await resp.text()
except Exception as e:
raise StoreError(f"Failed fetching store index: {e}") from e
try:
data = json.loads(raw)
packages = data.get("packages", [])
parsed: dict[str, Package] = {}
for p in packages:
pkg = Package(
id=p["id"],
name=p.get("name", p["id"]),
type=p.get("type", "integration"),
domain=p["domain"],
version=str(p.get("version", "0.0.0")),
zip_url=p["zip_url"],
source_path=p["source_path"],
)
parsed[pkg.id] = pkg
self._packages = parsed
self._last_error = None
except Exception as e:
raise StoreError(f"Invalid store.json: {e}") from e
def installed_version(self, domain: str) -> str | None:
manifest = Path(self.hass.config.path("custom_components", domain, "manifest.json"))
if not manifest.exists():
return None
try:
data = json.loads(manifest.read_text(encoding="utf-8"))
return str(data.get("version") or data.get("manifest_version") or "unknown")
except Exception:
return "unknown"
def is_installed(self, domain: str) -> bool:
return Path(self.hass.config.path("custom_components", domain)).exists()
def as_dict(self) -> dict[str, Any]:
items = []
for pkg in self._packages.values():
installed = self.is_installed(pkg.domain)
inst_ver = self.installed_version(pkg.domain) if installed else None
items.append(
{
"id": pkg.id,
"name": pkg.name,
"type": pkg.type,
"domain": pkg.domain,
"latest_version": pkg.version,
"installed": installed,
"installed_version": inst_ver,
}
)
return {
"store_url": self.config.store_url,
"auto_update": self.config.auto_update,
"packages": items,
"last_error": self._last_error,
}
async def install(self, package_id: str) -> None:
pkg = self._packages.get(package_id)
if not pkg:
raise StoreError(f"Unknown package_id: {package_id}")
await self._install_from_zip(pkg, mode="install")
async def update(self, package_id: str) -> None:
pkg = self._packages.get(package_id)
if not pkg:
raise StoreError(f"Unknown package_id: {package_id}")
await self._install_from_zip(pkg, mode="update")
async def update_all(self) -> None:
for pkg in self._packages.values():
if not self.is_installed(pkg.domain):
continue
inst = self.installed_version(pkg.domain) or "0.0.0"
# simple compare: if different => update (MVP)
if inst != pkg.version:
_LOGGER.info("Updating %s (%s -> %s)", pkg.id, inst, pkg.version)
await self._install_from_zip(pkg, mode="auto-update")
async def _install_from_zip(self, pkg: Package, mode: str) -> None:
"""Download ZIP, extract source_path, copy to custom_components/domain."""
session = async_get_clientsession(self.hass)
with tempfile.TemporaryDirectory() as td:
zip_path = Path(td) / "repo.zip"
extract_dir = Path(td) / "extract"
# Download
try:
async with session.get(pkg.zip_url, timeout=60) as resp:
if resp.status != 200:
raise StoreError(f"zip_url returned {resp.status}")
zip_path.write_bytes(await resp.read())
except Exception as e:
raise StoreError(f"Failed downloading ZIP: {e}") from e
# Extract (in executor)
try:
await self.hass.async_add_executor_job(self._extract_zip, zip_path, extract_dir)
except Exception as e:
raise StoreError(f"Failed extracting ZIP: {e}") from e
# Gitea ZIP hat meist einen Top-Level Ordner (repo-<hash>/...)
# source_path muss innerhalb dieses Ordners liegen -> wir suchen es robust.
src = self._find_source_path(extract_dir, pkg.source_path)
if not src:
raise StoreError(f"source_path not found in zip: {pkg.source_path}")
target = Path(self.hass.config.path("custom_components", pkg.domain))
# Replace target atomically-ish
tmp_target = Path(td) / "target_new"
shutil.copytree(src, tmp_target)
# remove old
if target.exists():
shutil.rmtree(target)
shutil.copytree(tmp_target, target)
persistent_notification.async_create(
self.hass,
(
f"**{pkg.name}** wurde {mode} installiert/aktualisiert.\n\n"
"Bitte Home Assistant **neu starten**, damit die Änderungen aktiv werden."
),
title="Bahmcloud Store",
notification_id=f"{DOMAIN}_{pkg.domain}_restart_required",
)
@staticmethod
def _extract_zip(zip_path: Path, extract_dir: Path) -> None:
extract_dir.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(extract_dir)
@staticmethod
def _find_source_path(extract_root: Path, source_path: str) -> Path | None:
# Try direct:
direct = extract_root / source_path
if direct.exists():
return direct
# Search one level down (typical archive root folder)
for child in extract_root.iterdir():
candidate = child / source_path
if candidate.exists():
return candidate
# Fallback: walk
parts = Path(source_path).parts
for p in extract_root.rglob(parts[-1]):
# crude: ensure endswith source_path
if str(p).replace("\\", "/").endswith(source_path):
return p
return None