diff --git a/custom_components/bahmcloud_store/views.py b/custom_components/bahmcloud_store/views.py index 7a82caa..fdb1b70 100644 --- a/custom_components/bahmcloud_store/views.py +++ b/custom_components/bahmcloud_store/views.py @@ -4,20 +4,26 @@ import base64 import logging from dataclasses import asdict from pathlib import Path -from typing import Any +from typing import Any, TYPE_CHECKING from aiohttp import web from homeassistant.components.http import HomeAssistantView +if TYPE_CHECKING: + from .core import BCSCore # typing only + _LOGGER = logging.getLogger(__name__) def _render_markdown_server_side(md: str) -> str | None: + """Render Markdown -> sanitized HTML (server-side).""" text = (md or "").strip() if not text: return None html: str | None = None + + # 1) python-markdown try: import markdown as mdlib # type: ignore @@ -33,16 +39,44 @@ def _render_markdown_server_side(md: str) -> str | None: if not html: return None + # 2) Sanitize via bleach try: import bleach # type: ignore allowed_tags = [ - "p", "br", "hr", "div", "span", "blockquote", "pre", "code", - "h1", "h2", "h3", "h4", "h5", "h6", - "ul", "ol", "li", "strong", "em", "b", "i", "u", "s", - "a", "img", - "table", "thead", "tbody", "tr", "th", "td", + "p", + "br", + "hr", + "div", + "span", + "blockquote", + "pre", + "code", + "h1", + "h2", + "h3", + "h4", + "h5", + "h6", + "ul", + "ol", + "li", + "strong", + "em", + "b", + "i", + "u", + "s", + "a", + "img", + "table", + "thead", + "tbody", + "tr", + "th", + "td", ] + allowed_attrs = { "a": ["href", "title", "target", "rel"], "img": ["src", "alt", "title"], @@ -58,51 +92,104 @@ def _render_markdown_server_side(md: str) -> str | None: protocols=["http", "https", "mailto"], strip=True, ) + sanitized = sanitized.replace( ' str | None: + if not isinstance(content, str): + return None + enc = "" + if isinstance(encoding, str): + enc = encoding.strip().lower() + if "base64" not in enc: + return None + try: + raw = base64.b64decode(content.encode("utf-8"), validate=False) + return raw.decode("utf-8", errors="replace") except Exception: - return html + return None -def _extract_text(obj: Any) -> str | None: +def _extract_text_recursive(obj: Any, depth: int = 0) -> str | None: + """ + Robust extraction for README markdown. + + Handles: + - str / bytes + - dict with: + - {content: "...", encoding: "base64"} (possibly nested) + - {readme: "..."} etc. + - list of dicts (pick first matching) + """ if obj is None: return None - if isinstance(obj, str): - return obj + if isinstance(obj, bytes): try: return obj.decode("utf-8", errors="replace") except Exception: return None + + if isinstance(obj, str): + return obj + + if depth > 8: + return None + if isinstance(obj, dict): - # gitea style: {"content":"...", "encoding":"base64"} + # 1) If it looks like "file content" content = obj.get("content") - enc = str(obj.get("encoding") or "").lower() - if isinstance(content, str) and "base64" in enc: - try: - raw = base64.b64decode(content.encode("utf-8"), validate=False) - return raw.decode("utf-8", errors="replace") - except Exception: - pass - for k in ("readme", "markdown", "text", "content", "body", "data"): + encoding = obj.get("encoding") + + # Base64 decode if possible + decoded = _maybe_decode_base64(content, encoding) + if decoded: + return decoded + + # content may already be plain text + if isinstance(content, str) and (not isinstance(encoding, str) or not encoding.strip()): + # Heuristic: treat as markdown if it has typical markdown chars, otherwise still return + return content + + # 2) direct text keys (readme/markdown/text/body/data) + for k in _TEXT_KEYS: v = obj.get(k) if isinstance(v, str): return v - # search nested + if isinstance(v, bytes): + try: + return v.decode("utf-8", errors="replace") + except Exception: + pass + + # 3) Sometimes nested under "file" / "result" / "payload" etc. for v in obj.values(): - t = _extract_text(v) - if t: - return t + out = _extract_text_recursive(v, depth + 1) + if out: + return out + return None + if isinstance(obj, list): - for it in obj: - t = _extract_text(it) - if t: - return t + for item in obj: + out = _extract_text_recursive(item, depth + 1) + if out: + return out return None + return None @@ -113,10 +200,15 @@ class StaticAssetsView(HomeAssistantView): async def get(self, request: web.Request, path: str) -> web.Response: base = Path(__file__).resolve().parent / "panel" - req = (path or "").lstrip("/") or "index.html" - target = (base / req).resolve() + base_resolved = base.resolve() - if not str(target).startswith(str(base.resolve())): + req_path = (path or "").lstrip("/") + if req_path == "": + req_path = "index.html" + + target = (base / req_path).resolve() + + if not str(target).startswith(str(base_resolved)): return web.Response(status=404) if target.is_dir(): @@ -126,20 +218,24 @@ class StaticAssetsView(HomeAssistantView): _LOGGER.error("BCS static asset not found: %s", target) return web.Response(status=404) - ct = "text/plain" + content_type = "text/plain" charset = None - if target.suffix == ".js": - ct, charset = "application/javascript", "utf-8" - elif target.suffix == ".html": - ct, charset = "text/html", "utf-8" - elif target.suffix == ".css": - ct, charset = "text/css", "utf-8" - elif target.suffix == ".svg": - ct = "image/svg+xml" - elif target.suffix == ".png": - ct = "image/png" - resp = web.Response(body=target.read_bytes(), content_type=ct, charset=charset) + if target.suffix == ".js": + content_type = "application/javascript" + charset = "utf-8" + elif target.suffix == ".html": + content_type = "text/html" + charset = "utf-8" + elif target.suffix == ".css": + content_type = "text/css" + charset = "utf-8" + elif target.suffix == ".svg": + content_type = "image/svg+xml" + elif target.suffix == ".png": + content_type = "image/png" + + resp = web.Response(body=target.read_bytes(), content_type=content_type, charset=charset) resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" resp.headers["Pragma"] = "no-cache" return resp @@ -150,11 +246,13 @@ class BCSApiView(HomeAssistantView): name = "api:bcs" requires_auth = True - def __init__(self, core) -> None: + def __init__(self, core: Any) -> None: self.core = core async def get(self, request: web.Request) -> web.Response: - return web.json_response({"ok": True, "version": self.core.version, "repos": self.core.list_repos_public()}) + return web.json_response( + {"ok": True, "version": self.core.version, "repos": self.core.list_repos_public()} + ) async def post(self, request: web.Request) -> web.Response: data = await request.json() @@ -177,7 +275,7 @@ class BCSCustomRepoView(HomeAssistantView): name = "api:bcs_custom_repo" requires_auth = True - def __init__(self, core) -> None: + def __init__(self, core: Any) -> None: self.core = core async def delete(self, request: web.Request) -> web.Response: @@ -193,7 +291,7 @@ class BCSReadmeView(HomeAssistantView): name = "api:bcs_readme" requires_auth = True - def __init__(self, core) -> None: + def __init__(self, core: Any) -> None: self.core = core async def get(self, request: web.Request) -> web.Response: @@ -201,10 +299,18 @@ class BCSReadmeView(HomeAssistantView): if not repo_id: return web.json_response({"ok": False, "message": "Missing repo_id"}, status=400) - maybe = await self.core.fetch_readme_markdown(repo_id) - md = _extract_text(maybe) - if not md or not md.strip(): - return web.json_response({"ok": False, "message": "README not found."}, status=404) + maybe_md = await self.core.fetch_readme_markdown(repo_id) - html = _render_markdown_server_side(md) - return web.json_response({"ok": True, "readme": md, "html": html}) + md = _extract_text_recursive(maybe_md) + if not md or not md.strip(): + t = type(maybe_md).__name__ + return web.json_response( + {"ok": False, "message": f"README not found or unsupported format (got {t})."}, + status=404, + ) + + # Ensure strict JSON string output (avoid accidental objects) + md_str = str(md) + + html = _render_markdown_server_side(md_str) + return web.json_response({"ok": True, "readme": md_str, "html": html})