from __future__ import annotations import logging from dataclasses import asdict from pathlib import Path from typing import Any, TYPE_CHECKING from aiohttp import web from homeassistant.components.http import HomeAssistantView from .providers import fetch_readme_markdown_from_repo if TYPE_CHECKING: from .core import BCSCore # typing only _LOGGER = logging.getLogger(__name__) def _render_markdown_server_side(md: str) -> str | None: """Render Markdown -> sanitized HTML (server-side).""" text = (md or "").strip() if not text: return None html: str | None = None try: import markdown as mdlib # type: ignore html = mdlib.markdown( text, extensions=["fenced_code", "tables", "sane_lists", "toc"], output_format="html5", ) except Exception as e: _LOGGER.debug("python-markdown render failed: %s", e) html = None if not html: return None try: import bleach # type: ignore allowed_tags = [ "p", "br", "hr", "div", "span", "blockquote", "pre", "code", "h1", "h2", "h3", "h4", "h5", "h6", "ul", "ol", "li", "strong", "em", "b", "i", "u", "s", "a", "img", "table", "thead", "tbody", "tr", "th", "td", ] allowed_attrs = { "a": ["href", "title", "target", "rel"], "img": ["src", "alt", "title"], "th": ["align"], "td": ["align"], "*": ["class"], } sanitized = bleach.clean( html, tags=allowed_tags, attributes=allowed_attrs, protocols=["http", "https", "mailto"], strip=True, ) sanitized = sanitized.replace( ' web.Response: base = Path(__file__).resolve().parent / "panel" base_resolved = base.resolve() req_path = (path or "").lstrip("/") if req_path == "": req_path = "index.html" target = (base / req_path).resolve() if not str(target).startswith(str(base_resolved)): return web.Response(status=404) if target.is_dir(): target = (target / "index.html").resolve() if not target.exists(): _LOGGER.error("BCS static asset not found: %s", target) return web.Response(status=404) content_type = "text/plain" charset = None if target.suffix == ".js": content_type = "application/javascript" charset = "utf-8" elif target.suffix == ".html": content_type = "text/html" charset = "utf-8" elif target.suffix == ".css": content_type = "text/css" charset = "utf-8" elif target.suffix == ".svg": content_type = "image/svg+xml" elif target.suffix == ".png": content_type = "image/png" resp = web.Response(body=target.read_bytes(), content_type=content_type, charset=charset) resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" resp.headers["Pragma"] = "no-cache" return resp class BCSApiView(HomeAssistantView): url = "/api/bcs" name = "api:bcs" requires_auth = True def __init__(self, core: Any) -> None: self.core = core async def get(self, request: web.Request) -> web.Response: return web.json_response( {"ok": True, "version": self.core.version, "repos": self.core.list_repos_public()} ) async def post(self, request: web.Request) -> web.Response: data = await request.json() op = data.get("op") if op == "add_custom_repo": url = str(data.get("url") or "").strip() name = data.get("name") name = str(name).strip() if name else None if not url: return web.json_response({"ok": False, "message": "Missing url"}, status=400) repo = await self.core.add_custom_repo(url=url, name=name) return web.json_response({"ok": True, "repo": asdict(repo)}) return web.json_response({"ok": False, "message": "Unknown operation"}, status=400) class BCSCustomRepoView(HomeAssistantView): url = "/api/bcs/custom_repo" name = "api:bcs_custom_repo" requires_auth = True def __init__(self, core: Any) -> None: self.core = core async def delete(self, request: web.Request) -> web.Response: repo_id = request.query.get("id") if not repo_id: return web.json_response({"ok": False, "message": "Missing id"}, status=400) await self.core.remove_custom_repo(repo_id) return web.json_response({"ok": True}) class BCSReadmeView(HomeAssistantView): url = "/api/bcs/readme" name = "api:bcs_readme" requires_auth = True def __init__(self, core: Any) -> None: self.core = core async def get(self, request: web.Request) -> web.Response: repo_id = request.query.get("repo_id") if not repo_id: return web.json_response({"ok": False, "message": "Missing repo_id"}, status=400) repos = self.core.list_repos_public() repo = next((r for r in repos if str(r.get("id", "")) == str(repo_id)), None) if not repo: return web.json_response({"ok": False, "message": "Repository not found."}, status=404) repo_url = repo.get("url") provider = repo.get("provider") default_branch = repo.get("default_branch") if not isinstance(repo_url, str) or not repo_url.strip(): return web.json_response({"ok": False, "message": "Repository URL missing."}, status=404) md = await fetch_readme_markdown_from_repo( self.core.hass, repo_url=repo_url, provider=provider if isinstance(provider, str) else None, default_branch=default_branch if isinstance(default_branch, str) else None, ) if not md or not md.strip(): return web.json_response( { "ok": False, "message": "README not found (raw endpoint returned 404).", }, status=404, ) html = _render_markdown_server_side(md) return web.json_response({"ok": True, "readme": md, "html": html})