From 6c3cdcde61aa1b415718b5f8578ee52d3feade28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Bachmann?= Date: Thu, 15 Jan 2026 14:08:27 +0000 Subject: [PATCH] Dateien nach "custom_components/bahmcloud_store" hochladen --- .../bahmcloud_store/providers.py | 378 ++++++++++++++++++ custom_components/bahmcloud_store/storage.py | 78 ++++ custom_components/bahmcloud_store/store.py | 338 ++++++++++++++++ custom_components/bahmcloud_store/update.py | 17 + custom_components/bahmcloud_store/views.py | 316 +++++++++++++++ 5 files changed, 1127 insertions(+) create mode 100644 custom_components/bahmcloud_store/providers.py create mode 100644 custom_components/bahmcloud_store/storage.py create mode 100644 custom_components/bahmcloud_store/store.py create mode 100644 custom_components/bahmcloud_store/update.py create mode 100644 custom_components/bahmcloud_store/views.py diff --git a/custom_components/bahmcloud_store/providers.py b/custom_components/bahmcloud_store/providers.py new file mode 100644 index 0000000..494f61d --- /dev/null +++ b/custom_components/bahmcloud_store/providers.py @@ -0,0 +1,378 @@ +from __future__ import annotations + +import logging +import re +import xml.etree.ElementTree as ET +from dataclasses import dataclass +from urllib.parse import quote_plus, urlparse + +from homeassistant.core import HomeAssistant +from homeassistant.helpers.aiohttp_client import async_get_clientsession + +_LOGGER = logging.getLogger(__name__) + +UA = "BahmcloudStore (Home Assistant)" + + +@dataclass +class RepoInfo: + owner: str | None = None + repo_name: str | None = None + description: str | None = None + provider: str | None = None + default_branch: str | None = None + + latest_version: str | None = None + latest_version_source: str | None = None # "release" | "tag" | "atom" | None + + +def _normalize_repo_name(name: str | None) -> str | None: + if not name: + return None + n = name.strip() + if n.endswith(".git"): + n = n[:-4] + return n or None + + +def _split_owner_repo(repo_url: str) -> tuple[str | None, str | None]: + u = urlparse(repo_url.rstrip("/")) + parts = [p for p in u.path.strip("/").split("/") if p] + if len(parts) < 2: + return None, None + owner = parts[0].strip() or None + repo = _normalize_repo_name(parts[1]) + return owner, repo + + +def detect_provider(repo_url: str) -> str: + host = urlparse(repo_url).netloc.lower() + if "github.com" in host: + return "github" + if "gitlab" in host: + return "gitlab" + + owner, repo = _split_owner_repo(repo_url) + if owner and repo: + return "gitea" + + return "generic" + + +async def _safe_json(session, url: str, *, headers: dict | None = None, timeout: int = 20): + try: + async with session.get(url, timeout=timeout, headers=headers) as resp: + status = resp.status + if status != 200: + return None, status + return await resp.json(), status + except Exception: + return None, None + + +async def _safe_text(session, url: str, *, headers: dict | None = None, timeout: int = 20): + try: + async with session.get(url, timeout=timeout, headers=headers) as resp: + status = resp.status + if status != 200: + return None, status + return await resp.text(), status + except Exception: + return None, None + + +def _extract_tag_from_github_url(url: str) -> str | None: + m = re.search(r"/releases/tag/([^/?#]+)", url) + if m: + return m.group(1) + m = re.search(r"/tag/([^/?#]+)", url) + if m: + return m.group(1) + return None + + +def _strip_html(s: str) -> str: + # minimal HTML entity cleanup for meta descriptions + out = ( + s.replace("&", "&") + .replace(""", '"') + .replace("'", "'") + .replace("<", "<") + .replace(">", ">") + ) + return re.sub(r"\s+", " ", out).strip() + + +def _extract_meta(html: str, *, prop: str | None = None, name: str | None = None) -> str | None: + # Extract + # or + if prop: + # property="..." content="..." + m = re.search( + r']+property=["\']' + re.escape(prop) + r'["\'][^>]+content=["\']([^"\']+)["\']', + html, + flags=re.IGNORECASE, + ) + if m: + return _strip_html(m.group(1)) + m = re.search( + r']+content=["\']([^"\']+)["\'][^>]+property=["\']' + re.escape(prop) + r'["\']', + html, + flags=re.IGNORECASE, + ) + if m: + return _strip_html(m.group(1)) + + if name: + m = re.search( + r']+name=["\']' + re.escape(name) + r'["\'][^>]+content=["\']([^"\']+)["\']', + html, + flags=re.IGNORECASE, + ) + if m: + return _strip_html(m.group(1)) + m = re.search( + r']+content=["\']([^"\']+)["\'][^>]+name=["\']' + re.escape(name) + r'["\']', + html, + flags=re.IGNORECASE, + ) + if m: + return _strip_html(m.group(1)) + + return None + + +async def _github_description_html(hass: HomeAssistant, owner: str, repo: str) -> str | None: + """ + GitHub API may be rate-limited; fetch public HTML and read meta description. + """ + session = async_get_clientsession(hass) + headers = { + "User-Agent": UA, + "Accept": "text/html,application/xhtml+xml", + } + + html, status = await _safe_text(session, f"https://github.com/{owner}/{repo}", headers=headers) + if not html or status != 200: + return None + + desc = _extract_meta(html, prop="og:description") + if desc: + return desc + + desc = _extract_meta(html, name="description") + if desc: + return desc + + return None + + +async def _github_latest_version_atom(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]: + session = async_get_clientsession(hass) + headers = {"User-Agent": UA, "Accept": "application/atom+xml,text/xml;q=0.9,*/*;q=0.8"} + + xml_text, _ = await _safe_text(session, f"https://github.com/{owner}/{repo}/releases.atom", headers=headers) + if not xml_text: + return None, None + + try: + root = ET.fromstring(xml_text) + except Exception: + return None, None + + for entry in root.findall(".//{*}entry"): + for link in entry.findall(".//{*}link"): + href = link.attrib.get("href") + if not href: + continue + tag = _extract_tag_from_github_url(href) + if tag: + return tag, "atom" + + return None, None + + +async def _github_latest_version_redirect(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]: + session = async_get_clientsession(hass) + headers = {"User-Agent": UA} + url = f"https://github.com/{owner}/{repo}/releases/latest" + try: + async with session.head(url, allow_redirects=False, timeout=15, headers=headers) as resp: + if resp.status in (301, 302, 303, 307, 308): + loc = resp.headers.get("Location") + if loc: + tag = _extract_tag_from_github_url(loc) + if tag: + return tag, "release" + except Exception: + pass + return None, None + + +async def _github_latest_version_api(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]: + session = async_get_clientsession(hass) + headers = {"Accept": "application/vnd.github+json", "User-Agent": UA} + + data, _ = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}/releases/latest", headers=headers) + if isinstance(data, dict): + tag = data.get("tag_name") or data.get("name") + if isinstance(tag, str) and tag.strip(): + return tag.strip(), "release" + + data, _ = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=1", headers=headers) + if isinstance(data, list) and data: + tag = data[0].get("name") + if isinstance(tag, str) and tag.strip(): + return tag.strip(), "tag" + + return None, None + + +async def _github_latest_version(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]: + tag, src = await _github_latest_version_atom(hass, owner, repo) + if tag: + return tag, src + + tag, src = await _github_latest_version_redirect(hass, owner, repo) + if tag: + return tag, src + + return await _github_latest_version_api(hass, owner, repo) + + +async def _gitea_latest_version(hass: HomeAssistant, base: str, owner: str, repo: str) -> tuple[str | None, str | None]: + session = async_get_clientsession(hass) + + data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}/releases?limit=1") + if isinstance(data, list) and data: + tag = data[0].get("tag_name") or data[0].get("name") + if isinstance(tag, str) and tag.strip(): + return tag.strip(), "release" + + data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}/tags?limit=1") + if isinstance(data, list) and data: + tag = data[0].get("name") + if isinstance(tag, str) and tag.strip(): + return tag.strip(), "tag" + + return None, None + + +async def _gitlab_latest_version(hass: HomeAssistant, base: str, owner: str, repo: str) -> tuple[str | None, str | None]: + session = async_get_clientsession(hass) + headers = {"User-Agent": UA} + project = quote_plus(f"{owner}/{repo}") + + data, _ = await _safe_json( + session, + f"{base}/api/v4/projects/{project}/releases?per_page=1&order_by=released_at&sort=desc", + headers=headers, + ) + if isinstance(data, list) and data: + tag = data[0].get("tag_name") or data[0].get("name") + if isinstance(tag, str) and tag.strip(): + return tag.strip(), "release" + + data, _ = await _safe_json( + session, + f"{base}/api/v4/projects/{project}/repository/tags?per_page=1&order_by=updated&sort=desc", + headers=headers, + ) + if isinstance(data, list) and data: + tag = data[0].get("name") + if isinstance(tag, str) and tag.strip(): + return tag.strip(), "tag" + + return None, None + + +async def fetch_repo_info(hass: HomeAssistant, repo_url: str) -> RepoInfo: + provider = detect_provider(repo_url) + owner, repo = _split_owner_repo(repo_url) + + info = RepoInfo( + owner=owner, + repo_name=repo, + description=None, + provider=provider, + default_branch=None, + latest_version=None, + latest_version_source=None, + ) + + if not owner or not repo: + return info + + session = async_get_clientsession(hass) + + try: + if provider == "github": + # Try API repo details (may be rate-limited) + headers = {"Accept": "application/vnd.github+json", "User-Agent": UA} + data, status = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}", headers=headers) + + if isinstance(data, dict): + info.description = data.get("description") + info.repo_name = _normalize_repo_name(data.get("name")) or repo + info.default_branch = data.get("default_branch") or "main" + if isinstance(data.get("owner"), dict) and data["owner"].get("login"): + info.owner = data["owner"]["login"] + else: + # If API blocked, still set reasonable defaults + if status == 403: + _LOGGER.debug("GitHub API blocked/rate-limited for repo info %s/%s", owner, repo) + info.default_branch = "main" + + # If description missing, fetch from GitHub HTML + if not info.description: + desc = await _github_description_html(hass, owner, repo) + if desc: + info.description = desc + + ver, src = await _github_latest_version(hass, owner, repo) + info.latest_version = ver + info.latest_version_source = src + return info + + if provider == "gitlab": + u = urlparse(repo_url.rstrip("/")) + base = f"{u.scheme}://{u.netloc}" + headers = {"User-Agent": UA} + project = quote_plus(f"{owner}/{repo}") + + data, _ = await _safe_json(session, f"{base}/api/v4/projects/{project}", headers=headers) + if isinstance(data, dict): + info.description = data.get("description") + info.repo_name = _normalize_repo_name(data.get("path")) or repo + info.default_branch = data.get("default_branch") or "main" + ns = data.get("namespace") + if isinstance(ns, dict) and ns.get("path"): + info.owner = ns.get("path") + + ver, src = await _gitlab_latest_version(hass, base, owner, repo) + info.latest_version = ver + info.latest_version_source = src + return info + + if provider == "gitea": + u = urlparse(repo_url.rstrip("/")) + base = f"{u.scheme}://{u.netloc}" + + data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}") + if isinstance(data, dict): + info.description = data.get("description") + info.repo_name = _normalize_repo_name(data.get("name")) or repo + info.default_branch = data.get("default_branch") or "main" + if isinstance(data.get("owner"), dict) and data["owner"].get("login"): + info.owner = data["owner"]["login"] + + ver, src = await _gitea_latest_version(hass, base, owner, repo) + info.latest_version = ver + info.latest_version_source = src + return info + + return info + + except Exception as e: + _LOGGER.debug("Provider fetch failed for %s: %s", repo_url, e) + return info \ No newline at end of file diff --git a/custom_components/bahmcloud_store/storage.py b/custom_components/bahmcloud_store/storage.py new file mode 100644 index 0000000..8f359cd --- /dev/null +++ b/custom_components/bahmcloud_store/storage.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +import uuid +from dataclasses import dataclass +from typing import Any + +from homeassistant.core import HomeAssistant +from homeassistant.helpers.storage import Store + +_STORAGE_VERSION = 1 +_STORAGE_KEY = "bcs_store" + + +@dataclass +class CustomRepo: + id: str + url: str + name: str | None = None + + +class BCSStorage: + """Persistent storage for manually added repositories.""" + + def __init__(self, hass: HomeAssistant) -> None: + self.hass = hass + self._store = Store(hass, _STORAGE_VERSION, _STORAGE_KEY) + + async def _load(self) -> dict[str, Any]: + data = await self._store.async_load() + if not data: + return {"custom_repos": []} + if "custom_repos" not in data: + data["custom_repos"] = [] + return data + + async def _save(self, data: dict[str, Any]) -> None: + await self._store.async_save(data) + + async def list_custom_repos(self) -> list[CustomRepo]: + data = await self._load() + repos = data.get("custom_repos", []) + out: list[CustomRepo] = [] + for r in repos: + if not isinstance(r, dict): + continue + rid = str(r.get("id") or "") + url = str(r.get("url") or "") + name = r.get("name") + if rid and url: + out.append(CustomRepo(id=rid, url=url, name=str(name) if name else None)) + return out + + async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo: + data = await self._load() + repos = data.get("custom_repos", []) + + # Deduplicate by URL + for r in repos: + if isinstance(r, dict) and str(r.get("url", "")).strip() == url.strip(): + # Update name if provided + if name: + r["name"] = name + await self._save(data) + return CustomRepo(id=str(r["id"]), url=str(r["url"]), name=r.get("name")) + + rid = f"custom:{uuid.uuid4().hex[:10]}" + entry = {"id": rid, "url": url.strip(), "name": name.strip() if name else None} + repos.append(entry) + data["custom_repos"] = repos + await self._save(data) + return CustomRepo(id=rid, url=entry["url"], name=entry["name"]) + + async def remove_custom_repo(self, repo_id: str) -> None: + data = await self._load() + repos = data.get("custom_repos", []) + data["custom_repos"] = [r for r in repos if not (isinstance(r, dict) and r.get("id") == repo_id)] + await self._save(data) + diff --git a/custom_components/bahmcloud_store/store.py b/custom_components/bahmcloud_store/store.py new file mode 100644 index 0000000..b7d41d5 --- /dev/null +++ b/custom_components/bahmcloud_store/store.py @@ -0,0 +1,338 @@ +from __future__ import annotations + +import json +import logging +import shutil +import tempfile +import zipfile +from dataclasses import dataclass +from pathlib import Path +from typing import Any +from urllib.parse import urlparse + +from aiohttp import web +from homeassistant.core import HomeAssistant +from homeassistant.helpers.aiohttp_client import async_get_clientsession +from homeassistant.components import persistent_notification +from homeassistant.util import yaml as ha_yaml +from homeassistant.components.http import HomeAssistantView + +_LOGGER = logging.getLogger(__name__) + +DOMAIN = "bahmcloud_store" + + +class StoreError(Exception): + """Store error.""" + + +@dataclass +class StoreConfig: + store_url: str + + +@dataclass +class Package: + id: str + name: str + type: str # "integration" | "store" + domain: str + repo: str + owner: str + repository: str + branch: str + source_path: str + + # computed each refresh + latest_version: str | None = None + zip_url: str | None = None + release_url: str | None = None + + +class BahmcloudStore: + def __init__(self, hass: HomeAssistant, config: StoreConfig) -> None: + self.hass = hass + self.config = config + self.packages: dict[str, Package] = {} + self.refresh_seconds: int = 300 + self._listeners: list[callable] = [] + + def add_listener(self, cb) -> None: + self._listeners.append(cb) + + def signal_entities_updated(self) -> None: + for cb in list(self._listeners): + try: + cb() + except Exception: + pass + + @staticmethod + def _zip_url(repo: str, branch: str) -> str: + return f"{repo.rstrip('/')}/archive/{branch}.zip" + + @staticmethod + def _base_from_repo(repo_url: str) -> str: + u = urlparse(repo_url.rstrip("/")) + return f"{u.scheme}://{u.netloc}" + + @staticmethod + def _raw_manifest_url(repo: str, branch: str, source_path: str) -> str: + # Example: + # https://git.bahmcloud.de/bahmcloud/easy_proxmox/raw/branch/main/custom_components/easy_proxmox/manifest.json + return f"{repo.rstrip('/')}/raw/branch/{branch}/{source_path.rstrip('/')}/manifest.json" + + async def _fetch_latest_version(self, pkg: Package) -> tuple[str | None, str | None]: + """ + Returns (latest_version, release_url) + Strategy: + 1) releases/latest -> tag_name + 2) tags?limit=1 -> first tag name + 3) fallback: read manifest.json from repo (version field) + """ + session = async_get_clientsession(self.hass) + base = self._base_from_repo(pkg.repo) + + # 1) latest release + latest_release_api = f"{base}/api/v1/repos/{pkg.owner}/{pkg.repository}/releases/latest" + try: + async with session.get(latest_release_api, timeout=20) as resp: + if resp.status == 200: + data = await resp.json() + tag = data.get("tag_name") + html_url = data.get("html_url") + if tag: + return (str(tag), str(html_url) if html_url else None) + except Exception: + pass + + # 2) tags fallback + tags_api = f"{base}/api/v1/repos/{pkg.owner}/{pkg.repository}/tags?limit=1" + try: + async with session.get(tags_api, timeout=20) as resp: + if resp.status == 200: + tags = await resp.json() + if tags and isinstance(tags, list): + name = tags[0].get("name") + if name: + return (str(name), None) + except Exception: + pass + + # 3) fallback: manifest.json version from repo + try: + manifest_url = self._raw_manifest_url(pkg.repo, pkg.branch, pkg.source_path) + async with session.get(manifest_url, timeout=20) as resp: + if resp.status == 200: + text = await resp.text() + data = json.loads(text) + ver = data.get("version") + if ver: + return (str(ver), None) + except Exception: + pass + + return (None, None) + + async def refresh(self) -> None: + session = async_get_clientsession(self.hass) + + try: + async with session.get(self.config.store_url, timeout=20) as resp: + if resp.status != 200: + raise StoreError(f"store_url returned {resp.status}") + raw = await resp.text() + except Exception as e: + raise StoreError(f"Failed fetching store index: {e}") from e + + try: + data = ha_yaml.parse_yaml(raw) + if not isinstance(data, dict): + raise StoreError("store.yaml must be a mapping") + + self.refresh_seconds = int(data.get("refresh_seconds", 300)) + pkgs = data.get("packages", []) + parsed: dict[str, Package] = {} + + for p in pkgs: + pkg = Package( + id=p["id"], + name=p.get("name", p["id"]), + type=p.get("type", "integration"), + domain=p["domain"], + repo=p["repo"], + owner=p["owner"], + repository=p["repository"], + branch=p.get("branch", "main"), + source_path=p["source_path"], + ) + pkg.zip_url = self._zip_url(pkg.repo, pkg.branch) + parsed[pkg.id] = pkg + + # compute latest versions + for pkg in parsed.values(): + latest, rel_url = await self._fetch_latest_version(pkg) + pkg.latest_version = latest or "unknown" + pkg.release_url = rel_url + + self.packages = parsed + + except Exception as e: + raise StoreError(f"Invalid store.yaml: {e}") from e + + def installed_version(self, domain: str) -> str | None: + manifest = Path(self.hass.config.path("custom_components", domain, "manifest.json")) + if not manifest.exists(): + return None + try: + data = json.loads(manifest.read_text(encoding="utf-8")) + return str(data.get("version") or "unknown") + except Exception: + return "unknown" + + def is_installed(self, domain: str) -> bool: + return Path(self.hass.config.path("custom_components", domain)).exists() + + async def install_from_zip(self, pkg: Package) -> None: + """Manual install/update: download ZIP and copy source_path into /config/custom_components/.""" + if not pkg.zip_url: + raise StoreError("zip_url not set") + + session = async_get_clientsession(self.hass) + + with tempfile.TemporaryDirectory() as td: + zip_path = Path(td) / "repo.zip" + extract_dir = Path(td) / "extract" + + async with session.get(pkg.zip_url, timeout=60) as resp: + if resp.status != 200: + raise StoreError(f"zip_url returned {resp.status}") + zip_path.write_bytes(await resp.read()) + + await self.hass.async_add_executor_job(self._extract_zip, zip_path, extract_dir) + + src = self._find_source_path(extract_dir, pkg.source_path) + if not src: + raise StoreError(f"source_path not found in zip: {pkg.source_path}") + + target = Path(self.hass.config.path("custom_components", pkg.domain)) + if target.exists(): + shutil.rmtree(target) + shutil.copytree(src, target) + + # Nach Installation: Entities neu aufbauen (damit es als Update auftaucht) + self.signal_entities_updated() + + persistent_notification.async_create( + self.hass, + ( + f"**{pkg.name}** wurde installiert/aktualisiert.\n\n" + "Bitte Home Assistant **neu starten**, damit die Änderungen aktiv werden." + ), + title="Bahmcloud Store", + notification_id=f"{DOMAIN}_{pkg.domain}_restart_required", + ) + + @staticmethod + def _extract_zip(zip_path: Path, extract_dir: Path) -> None: + extract_dir.mkdir(parents=True, exist_ok=True) + with zipfile.ZipFile(zip_path, "r") as zf: + zf.extractall(extract_dir) + + @staticmethod + def _find_source_path(extract_root: Path, source_path: str) -> Path | None: + direct = extract_root / source_path + if direct.exists(): + return direct + for child in extract_root.iterdir(): + candidate = child / source_path + if candidate.exists(): + return candidate + return None + + async def register_http_views(self) -> None: + """Register HTTP views for static panel assets and JSON API.""" + self.hass.http.register_view(_StaticView()) + self.hass.http.register_view(_APIView(self)) + + +class _StaticView(HomeAssistantView): + """ + IMPORTANT: + Custom Panel JS modules are loaded WITHOUT Authorization headers. + Therefore static panel assets must be publicly accessible (no auth). + """ + requires_auth = False + name = "bahmcloud_store_static" + url = "/api/bahmcloud_store_static/{path:.*}" + + async def get(self, request, path): + base = Path(__file__).resolve().parent / "panel" + if not path: + path = "index.html" + + f = (base / path).resolve() + + if not str(f).startswith(str(base)) or not f.exists() or not f.is_file(): + return web.Response(status=404, text="Not found") + + suffix = f.suffix.lower() + if suffix == ".js": + return web.Response(body=f.read_bytes(), content_type="application/javascript") + if suffix == ".css": + return web.Response(body=f.read_bytes(), content_type="text/css") + if suffix in (".html", ".htm"): + return web.Response(body=f.read_bytes(), content_type="text/html") + + return web.Response(body=f.read_bytes(), content_type="application/octet-stream") + + +class _APIView(HomeAssistantView): + """ + Auth-protected API: + GET /api/bahmcloud_store -> list packages + POST /api/bahmcloud_store {op:...} -> install/update a package + """ + requires_auth = True + name = "bahmcloud_store_api" + url = "/api/bahmcloud_store" + + def __init__(self, store: BahmcloudStore) -> None: + self.store = store + + async def get(self, request): + await self.store.refresh() + items: list[dict[str, Any]] = [] + for pkg in self.store.packages.values(): + installed = self.store.is_installed(pkg.domain) + items.append( + { + "id": pkg.id, + "name": pkg.name, + "domain": pkg.domain, + "type": pkg.type, + "installed": installed, + "installed_version": self.store.installed_version(pkg.domain) if installed else None, + "latest_version": pkg.latest_version, + "repo": pkg.repo, + "release_url": pkg.release_url, + } + ) + return self.json({"packages": items, "store_url": self.store.config.store_url}) + + async def post(self, request): + data = await request.json() + op = data.get("op") + package_id = data.get("package_id") + + if op not in ("install", "update"): + return self.json({"error": "unknown op"}, status_code=400) + if not package_id: + return self.json({"error": "package_id missing"}, status_code=400) + + pkg = self.store.packages.get(package_id) + if not pkg: + return self.json({"error": "unknown package_id"}, status_code=404) + + await self.store.install_from_zip(pkg) + return self.json({"ok": True}) diff --git a/custom_components/bahmcloud_store/update.py b/custom_components/bahmcloud_store/update.py new file mode 100644 index 0000000..1c07482 --- /dev/null +++ b/custom_components/bahmcloud_store/update.py @@ -0,0 +1,17 @@ +from __future__ import annotations + +# NOTE: +# Update entities will be implemented once installation/provider resolution is in place. +# This stub prevents platform load errors and keeps the integration stable in 0.3.0. + +from homeassistant.core import HomeAssistant +from homeassistant.helpers.entity_platform import AddEntitiesCallback + + +async def async_setup_platform( + hass: HomeAssistant, + config, + async_add_entities: AddEntitiesCallback, + discovery_info=None, +): + return diff --git a/custom_components/bahmcloud_store/views.py b/custom_components/bahmcloud_store/views.py new file mode 100644 index 0000000..fdb1b70 --- /dev/null +++ b/custom_components/bahmcloud_store/views.py @@ -0,0 +1,316 @@ +from __future__ import annotations + +import base64 +import logging +from dataclasses import asdict +from pathlib import Path +from typing import Any, TYPE_CHECKING + +from aiohttp import web +from homeassistant.components.http import HomeAssistantView + +if TYPE_CHECKING: + from .core import BCSCore # typing only + +_LOGGER = logging.getLogger(__name__) + + +def _render_markdown_server_side(md: str) -> str | None: + """Render Markdown -> sanitized HTML (server-side).""" + text = (md or "").strip() + if not text: + return None + + html: str | None = None + + # 1) python-markdown + try: + import markdown as mdlib # type: ignore + + html = mdlib.markdown( + text, + extensions=["fenced_code", "tables", "sane_lists", "toc"], + output_format="html5", + ) + except Exception as e: + _LOGGER.debug("python-markdown render failed: %s", e) + html = None + + if not html: + return None + + # 2) Sanitize via bleach + try: + import bleach # type: ignore + + allowed_tags = [ + "p", + "br", + "hr", + "div", + "span", + "blockquote", + "pre", + "code", + "h1", + "h2", + "h3", + "h4", + "h5", + "h6", + "ul", + "ol", + "li", + "strong", + "em", + "b", + "i", + "u", + "s", + "a", + "img", + "table", + "thead", + "tbody", + "tr", + "th", + "td", + ] + + allowed_attrs = { + "a": ["href", "title", "target", "rel"], + "img": ["src", "alt", "title"], + "th": ["align"], + "td": ["align"], + "*": ["class"], + } + + sanitized = bleach.clean( + html, + tags=allowed_tags, + attributes=allowed_attrs, + protocols=["http", "https", "mailto"], + strip=True, + ) + + sanitized = sanitized.replace( + ' str | None: + if not isinstance(content, str): + return None + enc = "" + if isinstance(encoding, str): + enc = encoding.strip().lower() + if "base64" not in enc: + return None + try: + raw = base64.b64decode(content.encode("utf-8"), validate=False) + return raw.decode("utf-8", errors="replace") + except Exception: + return None + + +def _extract_text_recursive(obj: Any, depth: int = 0) -> str | None: + """ + Robust extraction for README markdown. + + Handles: + - str / bytes + - dict with: + - {content: "...", encoding: "base64"} (possibly nested) + - {readme: "..."} etc. + - list of dicts (pick first matching) + """ + if obj is None: + return None + + if isinstance(obj, bytes): + try: + return obj.decode("utf-8", errors="replace") + except Exception: + return None + + if isinstance(obj, str): + return obj + + if depth > 8: + return None + + if isinstance(obj, dict): + # 1) If it looks like "file content" + content = obj.get("content") + encoding = obj.get("encoding") + + # Base64 decode if possible + decoded = _maybe_decode_base64(content, encoding) + if decoded: + return decoded + + # content may already be plain text + if isinstance(content, str) and (not isinstance(encoding, str) or not encoding.strip()): + # Heuristic: treat as markdown if it has typical markdown chars, otherwise still return + return content + + # 2) direct text keys (readme/markdown/text/body/data) + for k in _TEXT_KEYS: + v = obj.get(k) + if isinstance(v, str): + return v + if isinstance(v, bytes): + try: + return v.decode("utf-8", errors="replace") + except Exception: + pass + + # 3) Sometimes nested under "file" / "result" / "payload" etc. + for v in obj.values(): + out = _extract_text_recursive(v, depth + 1) + if out: + return out + + return None + + if isinstance(obj, list): + for item in obj: + out = _extract_text_recursive(item, depth + 1) + if out: + return out + return None + + return None + + +class StaticAssetsView(HomeAssistantView): + url = "/api/bahmcloud_store_static/{path:.*}" + name = "api:bahmcloud_store_static" + requires_auth = False + + async def get(self, request: web.Request, path: str) -> web.Response: + base = Path(__file__).resolve().parent / "panel" + base_resolved = base.resolve() + + req_path = (path or "").lstrip("/") + if req_path == "": + req_path = "index.html" + + target = (base / req_path).resolve() + + if not str(target).startswith(str(base_resolved)): + return web.Response(status=404) + + if target.is_dir(): + target = (target / "index.html").resolve() + + if not target.exists(): + _LOGGER.error("BCS static asset not found: %s", target) + return web.Response(status=404) + + content_type = "text/plain" + charset = None + + if target.suffix == ".js": + content_type = "application/javascript" + charset = "utf-8" + elif target.suffix == ".html": + content_type = "text/html" + charset = "utf-8" + elif target.suffix == ".css": + content_type = "text/css" + charset = "utf-8" + elif target.suffix == ".svg": + content_type = "image/svg+xml" + elif target.suffix == ".png": + content_type = "image/png" + + resp = web.Response(body=target.read_bytes(), content_type=content_type, charset=charset) + resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" + resp.headers["Pragma"] = "no-cache" + return resp + + +class BCSApiView(HomeAssistantView): + url = "/api/bcs" + name = "api:bcs" + requires_auth = True + + def __init__(self, core: Any) -> None: + self.core = core + + async def get(self, request: web.Request) -> web.Response: + return web.json_response( + {"ok": True, "version": self.core.version, "repos": self.core.list_repos_public()} + ) + + async def post(self, request: web.Request) -> web.Response: + data = await request.json() + op = data.get("op") + + if op == "add_custom_repo": + url = str(data.get("url") or "").strip() + name = data.get("name") + name = str(name).strip() if name else None + if not url: + return web.json_response({"ok": False, "message": "Missing url"}, status=400) + repo = await self.core.add_custom_repo(url=url, name=name) + return web.json_response({"ok": True, "repo": asdict(repo)}) + + return web.json_response({"ok": False, "message": "Unknown operation"}, status=400) + + +class BCSCustomRepoView(HomeAssistantView): + url = "/api/bcs/custom_repo" + name = "api:bcs_custom_repo" + requires_auth = True + + def __init__(self, core: Any) -> None: + self.core = core + + async def delete(self, request: web.Request) -> web.Response: + repo_id = request.query.get("id") + if not repo_id: + return web.json_response({"ok": False, "message": "Missing id"}, status=400) + await self.core.remove_custom_repo(repo_id) + return web.json_response({"ok": True}) + + +class BCSReadmeView(HomeAssistantView): + url = "/api/bcs/readme" + name = "api:bcs_readme" + requires_auth = True + + def __init__(self, core: Any) -> None: + self.core = core + + async def get(self, request: web.Request) -> web.Response: + repo_id = request.query.get("repo_id") + if not repo_id: + return web.json_response({"ok": False, "message": "Missing repo_id"}, status=400) + + maybe_md = await self.core.fetch_readme_markdown(repo_id) + + md = _extract_text_recursive(maybe_md) + if not md or not md.strip(): + t = type(maybe_md).__name__ + return web.json_response( + {"ok": False, "message": f"README not found or unsupported format (got {t})."}, + status=404, + ) + + # Ensure strict JSON string output (avoid accidental objects) + md_str = str(md) + + html = _render_markdown_server_side(md_str) + return web.json_response({"ok": True, "readme": md_str, "html": html})