From 714ced5d2ce612752c3046ca3db50fb512e26a50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Bachmann?= Date: Thu, 15 Jan 2026 10:13:30 +0000 Subject: [PATCH] custom_components/bahmcloud_store/views.py aktualisiert --- custom_components/bahmcloud_store/views.py | 192 +++++++++++---------- 1 file changed, 104 insertions(+), 88 deletions(-) diff --git a/custom_components/bahmcloud_store/views.py b/custom_components/bahmcloud_store/views.py index 378182c..087208d 100644 --- a/custom_components/bahmcloud_store/views.py +++ b/custom_components/bahmcloud_store/views.py @@ -1,128 +1,144 @@ from __future__ import annotations +import logging +from dataclasses import asdict from pathlib import Path -from typing import TYPE_CHECKING +from typing import Any from aiohttp import web -from homeassistant.components.http import HomeAssistantView -if TYPE_CHECKING: - from .core import BCSCore +from homeassistant.components.http import HomeAssistantView +from homeassistant.core import HomeAssistant + +from .core import BCSCore + +_LOGGER = logging.getLogger(__name__) + +DOMAIN = "bahmcloud_store" class StaticAssetsView(HomeAssistantView): - """ - Static panel assets MUST be public (no auth), because Home Assistant loads - custom panel JS modules without auth headers. - """ - requires_auth = False - name = "bahmcloud_store_static" url = "/api/bahmcloud_store_static/{path:.*}" + name = "api:bahmcloud_store_static" + requires_auth = True - async def get(self, request, path): + async def get(self, request: web.Request, path: str) -> web.Response: base = Path(__file__).resolve().parent / "panel" - if not path: - path = "panel.js" + target = (base / path).resolve() - f = (base / path).resolve() + if not str(target).startswith(str(base.resolve())): + return web.Response(status=404) - # Prevent path traversal - if not str(f).startswith(str(base)) or not f.exists() or not f.is_file(): - return web.Response(status=404, text="Not found") + if target.is_dir(): + target = target / "index.html" - suffix = f.suffix.lower() - if suffix == ".js": - return web.Response(body=f.read_bytes(), content_type="application/javascript") - if suffix == ".css": - return web.Response(body=f.read_bytes(), content_type="text/css") - if suffix in (".html", ".htm"): - return web.Response(body=f.read_bytes(), content_type="text/html") + if not target.exists(): + return web.Response(status=404) - return web.Response(body=f.read_bytes(), content_type="application/octet-stream") + # Basic content types + ct = "text/plain" + if target.suffix == ".js": + ct = "application/javascript" + elif target.suffix == ".html": + ct = "text/html" + elif target.suffix == ".css": + ct = "text/css" + elif target.suffix == ".svg": + ct = "image/svg+xml" + elif target.suffix == ".png": + ct = "image/png" + + return web.Response(body=target.read_bytes(), content_type=ct) class BCSApiView(HomeAssistantView): - """ - BCS API (auth required) - - GET /api/bcs - - POST /api/bcs (op=add_custom_repo) - """ - requires_auth = True - name = "bcs_api" url = "/api/bcs" + name = "api:bcs" + requires_auth = True - def __init__(self, core: "BCSCore") -> None: + def __init__(self, core: BCSCore) -> None: self.core = core - async def get(self, request): - # Refresh on-demand so the UI "Refresh" button updates immediately. - await self.core.refresh() + async def get(self, request: web.Request) -> web.Response: + payload: dict[str, Any] = { + "ok": True, + "version": self.core.version, + "repos": self.core.list_repos_public(), + } + return web.json_response(payload) - return self.json( - { - "repos": self.core.list_repos_public(), - "store_url": self.core.config.store_url, - "refresh_seconds": self.core.refresh_seconds, - "version": self.core.version, - } - ) - - async def post(self, request): + async def post(self, request: web.Request) -> web.Response: data = await request.json() - op = str(data.get("op") or "").strip() + op = data.get("op") - if op != "add_custom_repo": - return self.json({"error": "unknown op"}, status_code=400) + if op == "add_custom_repo": + url = str(data.get("url") or "").strip() + name = data.get("name") + name = str(name).strip() if name else None + if not url: + return web.json_response({"ok": False, "message": "Missing url"}, status=400) + repo = await self.core.add_custom_repo(url=url, name=name) + return web.json_response({"ok": True, "repo": asdict(repo)}) - url = str(data.get("url") or "").strip() - name = data.get("name") - name = str(name).strip() if isinstance(name, str) and name.strip() else None + return web.json_response({"ok": False, "message": "Unknown operation"}, status=400) - if not url: - return self.json({"error": "url missing"}, status_code=400) - repo = await self.core.add_custom_repo(url=url, name=name) - return self.json({"ok": True, "repo": {"id": repo.id, "url": repo.url, "name": repo.name}}) +class BCSCustomRepoView(HomeAssistantView): + url = "/api/bcs/custom_repo" + name = "api:bcs_custom_repo" + requires_auth = True + + def __init__(self, core: BCSCore) -> None: + self.core = core + + async def delete(self, request: web.Request) -> web.Response: + repo_id = request.query.get("id") + if not repo_id: + return web.json_response({"ok": False, "message": "Missing id"}, status=400) + await self.core.remove_custom_repo(repo_id) + return web.json_response({"ok": True}) class BCSReadmeView(HomeAssistantView): - """ - - GET /api/bcs/readme?repo_id= - Returns README markdown from repository root (best-effort). - """ - requires_auth = True - name = "bcs_readme" url = "/api/bcs/readme" + name = "api:bcs_readme" + requires_auth = True - def __init__(self, core: "BCSCore") -> None: + def __init__(self, core: BCSCore) -> None: self.core = core - async def get(self, request): - repo_id = str(request.query.get("repo_id") or "").strip() + async def get(self, request: web.Request) -> web.Response: + repo_id = request.query.get("repo_id") if not repo_id: - return self.json({"error": "repo_id missing"}, status_code=400) - - repo = self.core.get_repo(repo_id) - if not repo: - return self.json({"error": "repo not found"}, status_code=404) + return web.json_response({"ok": False, "message": "Missing repo_id"}, status=400) md = await self.core.fetch_readme_markdown(repo_id) - if md is None: - return self.json( - { - "ok": False, - "repo_id": repo_id, - "url": repo.url, - "readme": None, - "message": "README not found in repository root.", - } - ) + if not md: + return web.json_response({"ok": False, "message": "README not found."}, status=404) - return self.json( - { - "ok": True, - "repo_id": repo_id, - "url": repo.url, - "readme": md, - } - ) + html = None + + # Render markdown via HA util (best effort, version-dependent) + try: + # Most HA versions ship a markdown renderer utility + from homeassistant.util.markdown import async_render_markdown # type: ignore + + rendered = await async_render_markdown(self.core.hass, md) # returns HTML string + html = rendered + except Exception as e: + _LOGGER.debug("Markdown render failed (util.markdown): %s", e) + html = None + + # Sanitize HTML if sanitizer exists + if html: + try: + from homeassistant.util.sanitize_html import async_sanitize_html # type: ignore + + html = await async_sanitize_html(self.core.hass, html) + except Exception as e: + _LOGGER.debug("HTML sanitize not available/failed: %s", e) + # If sanitizer is missing, still return rendered HTML (HA renderer is typically safe-ish), + # but keep it best-effort. + pass + + return web.json_response({"ok": True, "readme": md, "html": html})