diff --git a/custom_components/bahmcloud_store/views.py b/custom_components/bahmcloud_store/views.py index a9cebfd..7e5effc 100644 --- a/custom_components/bahmcloud_store/views.py +++ b/custom_components/bahmcloud_store/views.py @@ -1,85 +1,292 @@ -"""HTTP views for Bahmcloud Store (BCS). - -This module exposes the internal store state via a small REST API and serves -static panel assets. -""" - from __future__ import annotations +import base64 import logging -from typing import Any +from dataclasses import asdict +from pathlib import Path +from typing import Any, TYPE_CHECKING from aiohttp import web - from homeassistant.components.http import HomeAssistantView -from homeassistant.core import HomeAssistant -from .const import DOMAIN +if TYPE_CHECKING: + from .core import BCSCore # typing only _LOGGER = logging.getLogger(__name__) -def _get_core(hass: HomeAssistant): - data = hass.data.get(DOMAIN) - if not data or "core" not in data: - raise RuntimeError("BCS core not initialized") - return data["core"] +def _render_markdown_server_side(md: str) -> str | None: + text = (md or "").strip() + if not text: + return None + html: str | None = None -class BcsBaseView(HomeAssistantView): - """Base view with shared helpers.""" + try: + import markdown as mdlib # type: ignore - requires_auth = True + html = mdlib.markdown( + text, + extensions=["fenced_code", "tables", "sane_lists", "toc"], + output_format="html5", + ) + except Exception as e: + _LOGGER.debug("python-markdown render failed: %s", e) + html = None - def json(self, payload: dict[str, Any], status: int = 200) -> web.Response: - return web.json_response(payload, status=status) + if not html: + return None - def error(self, message: str, status: int = 400) -> web.Response: - return self.json({"ok": False, "error": message}, status=status) + try: + import bleach # type: ignore + allowed_tags = [ + "p", + "br", + "hr", + "div", + "span", + "blockquote", + "pre", + "code", + "h1", + "h2", + "h3", + "h4", + "h5", + "h6", + "ul", + "ol", + "li", + "strong", + "em", + "b", + "i", + "u", + "s", + "a", + "img", + "table", + "thead", + "tbody", + "tr", + "th", + "td", + ] -class BcsInstallDryRunView(BcsBaseView): - """POST /api/bcs/install?repo_id=... (dry-run).""" + allowed_attrs = { + "a": ["href", "title", "target", "rel"], + "img": ["src", "alt", "title"], + "th": ["align"], + "td": ["align"], + "*": ["class"], + } - url = "/api/bcs/install" - name = "api:bcs:install" - - async def post(self, request: web.Request) -> web.Response: - hass: HomeAssistant = request.app["hass"] - core = _get_core(hass) - - repo_id = request.query.get("repo_id") - if not repo_id: - return self.error("Missing required query parameter: repo_id", status=400) - - _LOGGER.info("BCS install dry-run requested (repo_id=%s)", repo_id) - - try: - result = await core.async_install_dry_run(repo_id) - except core.RepoNotFoundError: - _LOGGER.warning("BCS install dry-run failed: repo not found (repo_id=%s)", repo_id) - return self.error("Repository not found", status=404) - except core.DomainResolutionError as err: - _LOGGER.warning( - "BCS install dry-run failed: domain resolution error (repo_id=%s): %s", - repo_id, - err, - ) - return self.error(str(err), status=422) - except Exception as err: # pylint: disable=broad-exception-caught - _LOGGER.exception("BCS install dry-run failed (repo_id=%s): %s", repo_id, err) - return self.error("Internal error", status=500) - - return self.json( - { - "ok": True, - "action": "install", - "domain": result["domain"], - }, - status=200, + sanitized = bleach.clean( + html, + tags=allowed_tags, + attributes=allowed_attrs, + protocols=["http", "https", "mailto"], + strip=True, ) + sanitized = sanitized.replace( + ' str | None: + if not isinstance(content, str): + return None + enc = "" + if isinstance(encoding, str): + enc = encoding.strip().lower() + if "base64" not in enc: + return None + try: + raw = base64.b64decode(content.encode("utf-8"), validate=False) + return raw.decode("utf-8", errors="replace") + except Exception: + return None + + +def _extract_text_recursive(obj: Any, depth: int = 0) -> str | None: + if obj is None: + return None + + if isinstance(obj, bytes): + try: + return obj.decode("utf-8", errors="replace") + except Exception: + return None + + if isinstance(obj, str): + return obj + + if depth > 8: + return None + + if isinstance(obj, dict): + content = obj.get("content") + encoding = obj.get("encoding") + + decoded = _maybe_decode_base64(content, encoding) + if decoded: + return decoded + + if isinstance(content, str) and (not isinstance(encoding, str) or not encoding.strip()): + return content + + for k in _TEXT_KEYS: + v = obj.get(k) + if isinstance(v, str): + return v + if isinstance(v, bytes): + try: + return v.decode("utf-8", errors="replace") + except Exception: + pass + + for v in obj.values(): + out = _extract_text_recursive(v, depth + 1) + if out: + return out + + return None + + if isinstance(obj, list): + for item in obj: + out = _extract_text_recursive(item, depth + 1) + if out: + return out + return None + + return None + + +class StaticAssetsView(HomeAssistantView): + url = "/api/bahmcloud_store_static/{path:.*}" + name = "api:bahmcloud_store_static" + requires_auth = False + + async def get(self, request: web.Request, path: str) -> web.StreamResponse: + base = Path(__file__).resolve().parent / "panel" + base_resolved = base.resolve() + + req_path = (path or "").lstrip("/") + if req_path == "": + req_path = "index.html" + + target = (base / req_path).resolve() + + if not str(target).startswith(str(base_resolved)): + return web.Response(status=404) + + if target.is_dir(): + target = (target / "index.html").resolve() + + if not target.exists(): + _LOGGER.error("BCS static asset not found: %s", target) + return web.Response(status=404) + + resp = web.FileResponse(path=target) + resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" + resp.headers["Pragma"] = "no-cache" + return resp + + +class BCSApiView(HomeAssistantView): + url = "/api/bcs" + name = "api:bcs" + requires_auth = True + + def __init__(self, core: Any) -> None: + self.core: BCSCore = core + + async def get(self, request: web.Request) -> web.Response: + return web.json_response( + {"ok": True, "version": self.core.version, "repos": self.core.list_repos_public()} + ) + + async def post(self, request: web.Request) -> web.Response: + action = request.query.get("action") + if action == "refresh": + _LOGGER.info("BCS manual refresh triggered via API") + try: + await self.core.full_refresh(source="manual") + return web.json_response({"ok": True}) + except Exception as e: + _LOGGER.error("BCS manual refresh failed: %s", e) + return web.json_response({"ok": False, "message": "Refresh failed"}, status=500) + + try: + data = await request.json() + except Exception: + data = {} + + op = data.get("op") + + if op == "add_custom_repo": + url = str(data.get("url") or "").strip() + name = data.get("name") + name = str(name).strip() if name else None + if not url: + return web.json_response({"ok": False, "message": "Missing url"}, status=400) + repo = await self.core.add_custom_repo(url=url, name=name) + return web.json_response({"ok": True, "repo": asdict(repo)}) + + return web.json_response({"ok": False, "message": "Unknown operation"}, status=400) + + +class BCSCustomRepoView(HomeAssistantView): + url = "/api/bcs/custom_repo" + name = "api:bcs_custom_repo" + requires_auth = True + + def __init__(self, core: Any) -> None: + self.core: BCSCore = core + + async def delete(self, request: web.Request) -> web.Response: + repo_id = request.query.get("id") + if not repo_id: + return web.json_response({"ok": False, "message": "Missing id"}, status=400) + await self.core.remove_custom_repo(repo_id) + return web.json_response({"ok": True}) + + +class BCSReadmeView(HomeAssistantView): + url = "/api/bcs/readme" + name = "api:bcs_readme" + requires_auth = True + + def __init__(self, core: Any) -> None: + self.core: BCSCore = core + + async def get(self, request: web.Request) -> web.Response: + repo_id = request.query.get("repo_id") + if not repo_id: + return web.json_response({"ok": False, "message": "Missing repo_id"}, status=400) + + maybe_md = await self.core.fetch_readme_markdown(repo_id) + + md = _extract_text_recursive(maybe_md) + if not md or not md.strip(): + t = type(maybe_md).__name__ + return web.json_response( + {"ok": False, "message": f"README not found or unsupported format (got {t})."}, + status=404, + ) + + md_str = str(md) + html = _render_markdown_server_side(md_str) + return web.json_response({"ok": True, "readme": md_str, "html": html})