custom_components/bahmcloud_store/store.py aktualisiert

This commit is contained in:
2026-01-15 14:31:58 +00:00
parent 1837ed4a13
commit 2c50765d66

View File

@@ -1,338 +1,52 @@
from __future__ import annotations
import json
import logging
import shutil
import tempfile
import zipfile
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from aiohttp import web
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.components import persistent_notification
from homeassistant.util import yaml as ha_yaml
from homeassistant.components.http import HomeAssistantView
from homeassistant.helpers.storage import Store
_LOGGER = logging.getLogger(__name__)
DOMAIN = "bahmcloud_store"
class StoreError(Exception):
"""Store error."""
STORAGE_KEY = "bahmcloud_store"
STORAGE_VERSION = 1
@dataclass
class StoreConfig:
store_url: str
@dataclass
class Package:
class CustomRepo:
id: str
name: str
type: str # "integration" | "store"
domain: str
repo: str
owner: str
repository: str
branch: str
source_path: str
# computed each refresh
latest_version: str | None = None
zip_url: str | None = None
release_url: str | None = None
url: str
name: str | None = None
class BahmcloudStore:
def __init__(self, hass: HomeAssistant, config: StoreConfig) -> None:
self.hass = hass
self.config = config
self.packages: dict[str, Package] = {}
self.refresh_seconds: int = 300
self._listeners: list[callable] = []
class BCSStorage:
def __init__(self, hass: HomeAssistant) -> None:
self._store = Store(hass, STORAGE_VERSION, STORAGE_KEY)
def add_listener(self, cb) -> None:
self._listeners.append(cb)
async def list_custom_repos(self) -> list[CustomRepo]:
data = await self._store.async_load()
if not isinstance(data, dict):
return []
items = data.get("custom_repos", [])
if not isinstance(items, list):
return []
def signal_entities_updated(self) -> None:
for cb in list(self._listeners):
try:
cb()
except Exception:
pass
out: list[CustomRepo] = []
for it in items:
if not isinstance(it, dict):
continue
rid = str(it.get("id") or "").strip()
url = str(it.get("url") or "").strip()
name = it.get("name")
name = str(name).strip() if isinstance(name, str) and name.strip() else None
if rid and url:
out.append(CustomRepo(id=rid, url=url, name=name))
return out
@staticmethod
def _zip_url(repo: str, branch: str) -> str:
return f"{repo.rstrip('/')}/archive/{branch}.zip"
async def add_custom_repo(self, rid: str, url: str, name: str | None) -> None:
repos = await self.list_custom_repos()
if any(r.id == rid for r in repos):
return
repos.append(CustomRepo(id=rid, url=url, name=name))
await self._store.async_save({"custom_repos": [r.__dict__ for r in repos]})
@staticmethod
def _base_from_repo(repo_url: str) -> str:
u = urlparse(repo_url.rstrip("/"))
return f"{u.scheme}://{u.netloc}"
@staticmethod
def _raw_manifest_url(repo: str, branch: str, source_path: str) -> str:
# Example:
# https://git.bahmcloud.de/bahmcloud/easy_proxmox/raw/branch/main/custom_components/easy_proxmox/manifest.json
return f"{repo.rstrip('/')}/raw/branch/{branch}/{source_path.rstrip('/')}/manifest.json"
async def _fetch_latest_version(self, pkg: Package) -> tuple[str | None, str | None]:
"""
Returns (latest_version, release_url)
Strategy:
1) releases/latest -> tag_name
2) tags?limit=1 -> first tag name
3) fallback: read manifest.json from repo (version field)
"""
session = async_get_clientsession(self.hass)
base = self._base_from_repo(pkg.repo)
# 1) latest release
latest_release_api = f"{base}/api/v1/repos/{pkg.owner}/{pkg.repository}/releases/latest"
try:
async with session.get(latest_release_api, timeout=20) as resp:
if resp.status == 200:
data = await resp.json()
tag = data.get("tag_name")
html_url = data.get("html_url")
if tag:
return (str(tag), str(html_url) if html_url else None)
except Exception:
pass
# 2) tags fallback
tags_api = f"{base}/api/v1/repos/{pkg.owner}/{pkg.repository}/tags?limit=1"
try:
async with session.get(tags_api, timeout=20) as resp:
if resp.status == 200:
tags = await resp.json()
if tags and isinstance(tags, list):
name = tags[0].get("name")
if name:
return (str(name), None)
except Exception:
pass
# 3) fallback: manifest.json version from repo
try:
manifest_url = self._raw_manifest_url(pkg.repo, pkg.branch, pkg.source_path)
async with session.get(manifest_url, timeout=20) as resp:
if resp.status == 200:
text = await resp.text()
data = json.loads(text)
ver = data.get("version")
if ver:
return (str(ver), None)
except Exception:
pass
return (None, None)
async def refresh(self) -> None:
session = async_get_clientsession(self.hass)
try:
async with session.get(self.config.store_url, timeout=20) as resp:
if resp.status != 200:
raise StoreError(f"store_url returned {resp.status}")
raw = await resp.text()
except Exception as e:
raise StoreError(f"Failed fetching store index: {e}") from e
try:
data = ha_yaml.parse_yaml(raw)
if not isinstance(data, dict):
raise StoreError("store.yaml must be a mapping")
self.refresh_seconds = int(data.get("refresh_seconds", 300))
pkgs = data.get("packages", [])
parsed: dict[str, Package] = {}
for p in pkgs:
pkg = Package(
id=p["id"],
name=p.get("name", p["id"]),
type=p.get("type", "integration"),
domain=p["domain"],
repo=p["repo"],
owner=p["owner"],
repository=p["repository"],
branch=p.get("branch", "main"),
source_path=p["source_path"],
)
pkg.zip_url = self._zip_url(pkg.repo, pkg.branch)
parsed[pkg.id] = pkg
# compute latest versions
for pkg in parsed.values():
latest, rel_url = await self._fetch_latest_version(pkg)
pkg.latest_version = latest or "unknown"
pkg.release_url = rel_url
self.packages = parsed
except Exception as e:
raise StoreError(f"Invalid store.yaml: {e}") from e
def installed_version(self, domain: str) -> str | None:
manifest = Path(self.hass.config.path("custom_components", domain, "manifest.json"))
if not manifest.exists():
return None
try:
data = json.loads(manifest.read_text(encoding="utf-8"))
return str(data.get("version") or "unknown")
except Exception:
return "unknown"
def is_installed(self, domain: str) -> bool:
return Path(self.hass.config.path("custom_components", domain)).exists()
async def install_from_zip(self, pkg: Package) -> None:
"""Manual install/update: download ZIP and copy source_path into /config/custom_components/<domain>."""
if not pkg.zip_url:
raise StoreError("zip_url not set")
session = async_get_clientsession(self.hass)
with tempfile.TemporaryDirectory() as td:
zip_path = Path(td) / "repo.zip"
extract_dir = Path(td) / "extract"
async with session.get(pkg.zip_url, timeout=60) as resp:
if resp.status != 200:
raise StoreError(f"zip_url returned {resp.status}")
zip_path.write_bytes(await resp.read())
await self.hass.async_add_executor_job(self._extract_zip, zip_path, extract_dir)
src = self._find_source_path(extract_dir, pkg.source_path)
if not src:
raise StoreError(f"source_path not found in zip: {pkg.source_path}")
target = Path(self.hass.config.path("custom_components", pkg.domain))
if target.exists():
shutil.rmtree(target)
shutil.copytree(src, target)
# Nach Installation: Entities neu aufbauen (damit es als Update auftaucht)
self.signal_entities_updated()
persistent_notification.async_create(
self.hass,
(
f"**{pkg.name}** wurde installiert/aktualisiert.\n\n"
"Bitte Home Assistant **neu starten**, damit die Änderungen aktiv werden."
),
title="Bahmcloud Store",
notification_id=f"{DOMAIN}_{pkg.domain}_restart_required",
)
@staticmethod
def _extract_zip(zip_path: Path, extract_dir: Path) -> None:
extract_dir.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(extract_dir)
@staticmethod
def _find_source_path(extract_root: Path, source_path: str) -> Path | None:
direct = extract_root / source_path
if direct.exists():
return direct
for child in extract_root.iterdir():
candidate = child / source_path
if candidate.exists():
return candidate
return None
async def register_http_views(self) -> None:
"""Register HTTP views for static panel assets and JSON API."""
self.hass.http.register_view(_StaticView())
self.hass.http.register_view(_APIView(self))
class _StaticView(HomeAssistantView):
"""
IMPORTANT:
Custom Panel JS modules are loaded WITHOUT Authorization headers.
Therefore static panel assets must be publicly accessible (no auth).
"""
requires_auth = False
name = "bahmcloud_store_static"
url = "/api/bahmcloud_store_static/{path:.*}"
async def get(self, request, path):
base = Path(__file__).resolve().parent / "panel"
if not path:
path = "index.html"
f = (base / path).resolve()
if not str(f).startswith(str(base)) or not f.exists() or not f.is_file():
return web.Response(status=404, text="Not found")
suffix = f.suffix.lower()
if suffix == ".js":
return web.Response(body=f.read_bytes(), content_type="application/javascript")
if suffix == ".css":
return web.Response(body=f.read_bytes(), content_type="text/css")
if suffix in (".html", ".htm"):
return web.Response(body=f.read_bytes(), content_type="text/html")
return web.Response(body=f.read_bytes(), content_type="application/octet-stream")
class _APIView(HomeAssistantView):
"""
Auth-protected API:
GET /api/bahmcloud_store -> list packages
POST /api/bahmcloud_store {op:...} -> install/update a package
"""
requires_auth = True
name = "bahmcloud_store_api"
url = "/api/bahmcloud_store"
def __init__(self, store: BahmcloudStore) -> None:
self.store = store
async def get(self, request):
await self.store.refresh()
items: list[dict[str, Any]] = []
for pkg in self.store.packages.values():
installed = self.store.is_installed(pkg.domain)
items.append(
{
"id": pkg.id,
"name": pkg.name,
"domain": pkg.domain,
"type": pkg.type,
"installed": installed,
"installed_version": self.store.installed_version(pkg.domain) if installed else None,
"latest_version": pkg.latest_version,
"repo": pkg.repo,
"release_url": pkg.release_url,
}
)
return self.json({"packages": items, "store_url": self.store.config.store_url})
async def post(self, request):
data = await request.json()
op = data.get("op")
package_id = data.get("package_id")
if op not in ("install", "update"):
return self.json({"error": "unknown op"}, status_code=400)
if not package_id:
return self.json({"error": "package_id missing"}, status_code=400)
pkg = self.store.packages.get(package_id)
if not pkg:
return self.json({"error": "unknown package_id"}, status_code=404)
await self.store.install_from_zip(pkg)
return self.json({"ok": True})
async def remove_custom_repo(self, rid: str) -> None:
repos = await self.list_custom_repos()
repos = [r for r in repos if r.id != rid]
await self._store.async_save({"custom_repos": [r.__dict__ for r in repos]})