from __future__ import annotations import base64 import logging from dataclasses import asdict from pathlib import Path from typing import Any, TYPE_CHECKING from aiohttp import web from homeassistant.components.http import HomeAssistantView if TYPE_CHECKING: from .core import BCSCore # typing only _LOGGER = logging.getLogger(__name__) def _render_markdown_server_side(md: str) -> str | None: text = (md or "").strip() if not text: return None html: str | None = None try: import markdown as mdlib # type: ignore html = mdlib.markdown( text, extensions=["fenced_code", "tables", "sane_lists", "toc"], output_format="html5", ) except Exception as e: _LOGGER.debug("python-markdown render failed: %s", e) html = None if not html: return None try: import bleach # type: ignore allowed_tags = [ "p", "br", "hr", "div", "span", "blockquote", "pre", "code", "h1", "h2", "h3", "h4", "h5", "h6", "ul", "ol", "li", "strong", "em", "b", "i", "u", "s", "a", "img", "table", "thead", "tbody", "tr", "th", "td", ] allowed_attrs = { "a": ["href", "title", "target", "rel"], "img": ["src", "alt", "title"], "th": ["align"], "td": ["align"], "*": ["class"], } sanitized = bleach.clean( html, tags=allowed_tags, attributes=allowed_attrs, protocols=["http", "https", "mailto"], strip=True, ) sanitized = sanitized.replace( ' str | None: if not isinstance(content, str): return None enc = "" if isinstance(encoding, str): enc = encoding.strip().lower() if "base64" not in enc: return None try: raw = base64.b64decode(content.encode("utf-8"), validate=False) return raw.decode("utf-8", errors="replace") except Exception: return None def _extract_text_recursive(obj: Any, depth: int = 0) -> str | None: if obj is None: return None if isinstance(obj, bytes): try: return obj.decode("utf-8", errors="replace") except Exception: return None if isinstance(obj, str): return obj if depth > 8: return None if isinstance(obj, dict): content = obj.get("content") encoding = obj.get("encoding") decoded = _maybe_decode_base64(content, encoding) if decoded: return decoded if isinstance(content, str) and (not isinstance(encoding, str) or not encoding.strip()): return content for k in _TEXT_KEYS: v = obj.get(k) if isinstance(v, str): return v if isinstance(v, bytes): try: return v.decode("utf-8", errors="replace") except Exception: pass for v in obj.values(): out = _extract_text_recursive(v, depth + 1) if out: return out return None if isinstance(obj, list): for item in obj: out = _extract_text_recursive(item, depth + 1) if out: return out return None return None class StaticAssetsView(HomeAssistantView): url = "/api/bahmcloud_store_static/{path:.*}" name = "api:bahmcloud_store_static" requires_auth = False async def get(self, request: web.Request, path: str) -> web.StreamResponse: base = Path(__file__).resolve().parent / "panel" base_resolved = base.resolve() req_path = (path or "").lstrip("/") if req_path == "": req_path = "index.html" target = (base / req_path).resolve() if not str(target).startswith(str(base_resolved)): return web.Response(status=404) if target.is_dir(): target = (target / "index.html").resolve() if not target.exists(): _LOGGER.error("BCS static asset not found: %s", target) return web.Response(status=404) resp = web.FileResponse(path=target) resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" resp.headers["Pragma"] = "no-cache" return resp class BCSApiView(HomeAssistantView): url = "/api/bcs" name = "api:bcs" requires_auth = True def __init__(self, core: Any) -> None: self.core: BCSCore = core async def get(self, request: web.Request) -> web.Response: return web.json_response( {"ok": True, "version": self.core.version, "repos": self.core.list_repos_public()} ) async def post(self, request: web.Request) -> web.Response: action = request.query.get("action") if action == "refresh": _LOGGER.info("BCS manual refresh triggered via API") try: await self.core.full_refresh(source="manual") return web.json_response({"ok": True}) except Exception as e: _LOGGER.error("BCS manual refresh failed: %s", e) return web.json_response({"ok": False, "message": "Refresh failed"}, status=500) try: data = await request.json() except Exception: data = {} op = data.get("op") if op == "add_custom_repo": url = str(data.get("url") or "").strip() name = data.get("name") name = str(name).strip() if name else None if not url: return web.json_response({"ok": False, "message": "Missing url"}, status=400) repo = await self.core.add_custom_repo(url=url, name=name) return web.json_response({"ok": True, "repo": asdict(repo)}) return web.json_response({"ok": False, "message": "Unknown operation"}, status=400) class BCSCustomRepoView(HomeAssistantView): url = "/api/bcs/custom_repo" name = "api:bcs_custom_repo" requires_auth = True def __init__(self, core: Any) -> None: self.core: BCSCore = core async def delete(self, request: web.Request) -> web.Response: repo_id = request.query.get("id") if not repo_id: return web.json_response({"ok": False, "message": "Missing id"}, status=400) await self.core.remove_custom_repo(repo_id) return web.json_response({"ok": True}) class BCSReadmeView(HomeAssistantView): url = "/api/bcs/readme" name = "api:bcs_readme" requires_auth = True def __init__(self, core: Any) -> None: self.core: BCSCore = core async def get(self, request: web.Request) -> web.Response: repo_id = request.query.get("repo_id") if not repo_id: return web.json_response({"ok": False, "message": "Missing repo_id"}, status=400) maybe_md = await self.core.fetch_readme_markdown(repo_id) md = _extract_text_recursive(maybe_md) if not md or not md.strip(): t = type(maybe_md).__name__ return web.json_response( {"ok": False, "message": f"README not found or unsupported format (got {t})."}, status=404, ) md_str = str(md) html = _render_markdown_server_side(md_str) return web.json_response({"ok": True, "readme": md_str, "html": html})