Files
bahmcloud_store/custom_components/bahmcloud_store/store.py

339 lines
12 KiB
Python

from __future__ import annotations
import json
import logging
import shutil
import tempfile
import zipfile
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from aiohttp import web
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.components import persistent_notification
from homeassistant.util import yaml as ha_yaml
from homeassistant.components.http import HomeAssistantView
_LOGGER = logging.getLogger(__name__)
DOMAIN = "bahmcloud_store"
class StoreError(Exception):
"""Store error."""
@dataclass
class StoreConfig:
store_url: str
@dataclass
class Package:
id: str
name: str
type: str # "integration" | "store"
domain: str
repo: str
owner: str
repository: str
branch: str
source_path: str
# computed each refresh
latest_version: str | None = None
zip_url: str | None = None
release_url: str | None = None
class BahmcloudStore:
def __init__(self, hass: HomeAssistant, config: StoreConfig) -> None:
self.hass = hass
self.config = config
self.packages: dict[str, Package] = {}
self.refresh_seconds: int = 300
self._listeners: list[callable] = []
def add_listener(self, cb) -> None:
self._listeners.append(cb)
def signal_entities_updated(self) -> None:
for cb in list(self._listeners):
try:
cb()
except Exception:
pass
@staticmethod
def _zip_url(repo: str, branch: str) -> str:
return f"{repo.rstrip('/')}/archive/{branch}.zip"
@staticmethod
def _base_from_repo(repo_url: str) -> str:
u = urlparse(repo_url.rstrip("/"))
return f"{u.scheme}://{u.netloc}"
@staticmethod
def _raw_manifest_url(repo: str, branch: str, source_path: str) -> str:
# Example:
# https://git.bahmcloud.de/bahmcloud/easy_proxmox/raw/branch/main/custom_components/easy_proxmox/manifest.json
return f"{repo.rstrip('/')}/raw/branch/{branch}/{source_path.rstrip('/')}/manifest.json"
async def _fetch_latest_version(self, pkg: Package) -> tuple[str | None, str | None]:
"""
Returns (latest_version, release_url)
Strategy:
1) releases/latest -> tag_name
2) tags?limit=1 -> first tag name
3) fallback: read manifest.json from repo (version field)
"""
session = async_get_clientsession(self.hass)
base = self._base_from_repo(pkg.repo)
# 1) latest release
latest_release_api = f"{base}/api/v1/repos/{pkg.owner}/{pkg.repository}/releases/latest"
try:
async with session.get(latest_release_api, timeout=20) as resp:
if resp.status == 200:
data = await resp.json()
tag = data.get("tag_name")
html_url = data.get("html_url")
if tag:
return (str(tag), str(html_url) if html_url else None)
except Exception:
pass
# 2) tags fallback
tags_api = f"{base}/api/v1/repos/{pkg.owner}/{pkg.repository}/tags?limit=1"
try:
async with session.get(tags_api, timeout=20) as resp:
if resp.status == 200:
tags = await resp.json()
if tags and isinstance(tags, list):
name = tags[0].get("name")
if name:
return (str(name), None)
except Exception:
pass
# 3) fallback: manifest.json version from repo
try:
manifest_url = self._raw_manifest_url(pkg.repo, pkg.branch, pkg.source_path)
async with session.get(manifest_url, timeout=20) as resp:
if resp.status == 200:
text = await resp.text()
data = json.loads(text)
ver = data.get("version")
if ver:
return (str(ver), None)
except Exception:
pass
return (None, None)
async def refresh(self) -> None:
session = async_get_clientsession(self.hass)
try:
async with session.get(self.config.store_url, timeout=20) as resp:
if resp.status != 200:
raise StoreError(f"store_url returned {resp.status}")
raw = await resp.text()
except Exception as e:
raise StoreError(f"Failed fetching store index: {e}") from e
try:
data = ha_yaml.parse_yaml(raw)
if not isinstance(data, dict):
raise StoreError("store.yaml must be a mapping")
self.refresh_seconds = int(data.get("refresh_seconds", 300))
pkgs = data.get("packages", [])
parsed: dict[str, Package] = {}
for p in pkgs:
pkg = Package(
id=p["id"],
name=p.get("name", p["id"]),
type=p.get("type", "integration"),
domain=p["domain"],
repo=p["repo"],
owner=p["owner"],
repository=p["repository"],
branch=p.get("branch", "main"),
source_path=p["source_path"],
)
pkg.zip_url = self._zip_url(pkg.repo, pkg.branch)
parsed[pkg.id] = pkg
# compute latest versions
for pkg in parsed.values():
latest, rel_url = await self._fetch_latest_version(pkg)
pkg.latest_version = latest or "unknown"
pkg.release_url = rel_url
self.packages = parsed
except Exception as e:
raise StoreError(f"Invalid store.yaml: {e}") from e
def installed_version(self, domain: str) -> str | None:
manifest = Path(self.hass.config.path("custom_components", domain, "manifest.json"))
if not manifest.exists():
return None
try:
data = json.loads(manifest.read_text(encoding="utf-8"))
return str(data.get("version") or "unknown")
except Exception:
return "unknown"
def is_installed(self, domain: str) -> bool:
return Path(self.hass.config.path("custom_components", domain)).exists()
async def install_from_zip(self, pkg: Package) -> None:
"""Manual install/update: download ZIP and copy source_path into /config/custom_components/<domain>."""
if not pkg.zip_url:
raise StoreError("zip_url not set")
session = async_get_clientsession(self.hass)
with tempfile.TemporaryDirectory() as td:
zip_path = Path(td) / "repo.zip"
extract_dir = Path(td) / "extract"
async with session.get(pkg.zip_url, timeout=60) as resp:
if resp.status != 200:
raise StoreError(f"zip_url returned {resp.status}")
zip_path.write_bytes(await resp.read())
await self.hass.async_add_executor_job(self._extract_zip, zip_path, extract_dir)
src = self._find_source_path(extract_dir, pkg.source_path)
if not src:
raise StoreError(f"source_path not found in zip: {pkg.source_path}")
target = Path(self.hass.config.path("custom_components", pkg.domain))
if target.exists():
shutil.rmtree(target)
shutil.copytree(src, target)
# Nach Installation: Entities neu aufbauen (damit es als Update auftaucht)
self.signal_entities_updated()
persistent_notification.async_create(
self.hass,
(
f"**{pkg.name}** wurde installiert/aktualisiert.\n\n"
"Bitte Home Assistant **neu starten**, damit die Änderungen aktiv werden."
),
title="Bahmcloud Store",
notification_id=f"{DOMAIN}_{pkg.domain}_restart_required",
)
@staticmethod
def _extract_zip(zip_path: Path, extract_dir: Path) -> None:
extract_dir.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(extract_dir)
@staticmethod
def _find_source_path(extract_root: Path, source_path: str) -> Path | None:
direct = extract_root / source_path
if direct.exists():
return direct
for child in extract_root.iterdir():
candidate = child / source_path
if candidate.exists():
return candidate
return None
async def register_http_views(self) -> None:
"""Register HTTP views for static panel assets and JSON API."""
self.hass.http.register_view(_StaticView())
self.hass.http.register_view(_APIView(self))
class _StaticView(HomeAssistantView):
"""
IMPORTANT:
Custom Panel JS modules are loaded WITHOUT Authorization headers.
Therefore static panel assets must be publicly accessible (no auth).
"""
requires_auth = False
name = "bahmcloud_store_static"
url = "/api/bahmcloud_store_static/{path:.*}"
async def get(self, request, path):
base = Path(__file__).resolve().parent / "panel"
if not path:
path = "index.html"
f = (base / path).resolve()
if not str(f).startswith(str(base)) or not f.exists() or not f.is_file():
return web.Response(status=404, text="Not found")
suffix = f.suffix.lower()
if suffix == ".js":
return web.Response(body=f.read_bytes(), content_type="application/javascript")
if suffix == ".css":
return web.Response(body=f.read_bytes(), content_type="text/css")
if suffix in (".html", ".htm"):
return web.Response(body=f.read_bytes(), content_type="text/html")
return web.Response(body=f.read_bytes(), content_type="application/octet-stream")
class _APIView(HomeAssistantView):
"""
Auth-protected API:
GET /api/bahmcloud_store -> list packages
POST /api/bahmcloud_store {op:...} -> install/update a package
"""
requires_auth = True
name = "bahmcloud_store_api"
url = "/api/bahmcloud_store"
def __init__(self, store: BahmcloudStore) -> None:
self.store = store
async def get(self, request):
await self.store.refresh()
items: list[dict[str, Any]] = []
for pkg in self.store.packages.values():
installed = self.store.is_installed(pkg.domain)
items.append(
{
"id": pkg.id,
"name": pkg.name,
"domain": pkg.domain,
"type": pkg.type,
"installed": installed,
"installed_version": self.store.installed_version(pkg.domain) if installed else None,
"latest_version": pkg.latest_version,
"repo": pkg.repo,
"release_url": pkg.release_url,
}
)
return self.json({"packages": items, "store_url": self.store.config.store_url})
async def post(self, request):
data = await request.json()
op = data.get("op")
package_id = data.get("package_id")
if op not in ("install", "update"):
return self.json({"error": "unknown op"}, status_code=400)
if not package_id:
return self.json({"error": "package_id missing"}, status_code=400)
pkg = self.store.packages.get(package_id)
if not pkg:
return self.json({"error": "unknown package_id"}, status_code=404)
await self.store.install_from_zip(pkg)
return self.json({"ok": True})