diff --git a/custom_components/bahmcloud_store/views.py b/custom_components/bahmcloud_store/views.py index fdb1b70..7a82caa 100644 --- a/custom_components/bahmcloud_store/views.py +++ b/custom_components/bahmcloud_store/views.py @@ -4,26 +4,20 @@ import base64 import logging from dataclasses import asdict from pathlib import Path -from typing import Any, TYPE_CHECKING +from typing import Any from aiohttp import web from homeassistant.components.http import HomeAssistantView -if TYPE_CHECKING: - from .core import BCSCore # typing only - _LOGGER = logging.getLogger(__name__) def _render_markdown_server_side(md: str) -> str | None: - """Render Markdown -> sanitized HTML (server-side).""" text = (md or "").strip() if not text: return None html: str | None = None - - # 1) python-markdown try: import markdown as mdlib # type: ignore @@ -39,44 +33,16 @@ def _render_markdown_server_side(md: str) -> str | None: if not html: return None - # 2) Sanitize via bleach try: import bleach # type: ignore allowed_tags = [ - "p", - "br", - "hr", - "div", - "span", - "blockquote", - "pre", - "code", - "h1", - "h2", - "h3", - "h4", - "h5", - "h6", - "ul", - "ol", - "li", - "strong", - "em", - "b", - "i", - "u", - "s", - "a", - "img", - "table", - "thead", - "tbody", - "tr", - "th", - "td", + "p", "br", "hr", "div", "span", "blockquote", "pre", "code", + "h1", "h2", "h3", "h4", "h5", "h6", + "ul", "ol", "li", "strong", "em", "b", "i", "u", "s", + "a", "img", + "table", "thead", "tbody", "tr", "th", "td", ] - allowed_attrs = { "a": ["href", "title", "target", "rel"], "img": ["src", "alt", "title"], @@ -92,104 +58,51 @@ def _render_markdown_server_side(md: str) -> str | None: protocols=["http", "https", "mailto"], strip=True, ) - sanitized = sanitized.replace( ' str | None: - if not isinstance(content, str): - return None - enc = "" - if isinstance(encoding, str): - enc = encoding.strip().lower() - if "base64" not in enc: - return None - try: - raw = base64.b64decode(content.encode("utf-8"), validate=False) - return raw.decode("utf-8", errors="replace") except Exception: - return None + return html -def _extract_text_recursive(obj: Any, depth: int = 0) -> str | None: - """ - Robust extraction for README markdown. - - Handles: - - str / bytes - - dict with: - - {content: "...", encoding: "base64"} (possibly nested) - - {readme: "..."} etc. - - list of dicts (pick first matching) - """ +def _extract_text(obj: Any) -> str | None: if obj is None: return None - + if isinstance(obj, str): + return obj if isinstance(obj, bytes): try: return obj.decode("utf-8", errors="replace") except Exception: return None - - if isinstance(obj, str): - return obj - - if depth > 8: - return None - if isinstance(obj, dict): - # 1) If it looks like "file content" + # gitea style: {"content":"...", "encoding":"base64"} content = obj.get("content") - encoding = obj.get("encoding") - - # Base64 decode if possible - decoded = _maybe_decode_base64(content, encoding) - if decoded: - return decoded - - # content may already be plain text - if isinstance(content, str) and (not isinstance(encoding, str) or not encoding.strip()): - # Heuristic: treat as markdown if it has typical markdown chars, otherwise still return - return content - - # 2) direct text keys (readme/markdown/text/body/data) - for k in _TEXT_KEYS: + enc = str(obj.get("encoding") or "").lower() + if isinstance(content, str) and "base64" in enc: + try: + raw = base64.b64decode(content.encode("utf-8"), validate=False) + return raw.decode("utf-8", errors="replace") + except Exception: + pass + for k in ("readme", "markdown", "text", "content", "body", "data"): v = obj.get(k) if isinstance(v, str): return v - if isinstance(v, bytes): - try: - return v.decode("utf-8", errors="replace") - except Exception: - pass - - # 3) Sometimes nested under "file" / "result" / "payload" etc. + # search nested for v in obj.values(): - out = _extract_text_recursive(v, depth + 1) - if out: - return out - + t = _extract_text(v) + if t: + return t return None - if isinstance(obj, list): - for item in obj: - out = _extract_text_recursive(item, depth + 1) - if out: - return out + for it in obj: + t = _extract_text(it) + if t: + return t return None - return None @@ -200,15 +113,10 @@ class StaticAssetsView(HomeAssistantView): async def get(self, request: web.Request, path: str) -> web.Response: base = Path(__file__).resolve().parent / "panel" - base_resolved = base.resolve() + req = (path or "").lstrip("/") or "index.html" + target = (base / req).resolve() - req_path = (path or "").lstrip("/") - if req_path == "": - req_path = "index.html" - - target = (base / req_path).resolve() - - if not str(target).startswith(str(base_resolved)): + if not str(target).startswith(str(base.resolve())): return web.Response(status=404) if target.is_dir(): @@ -218,24 +126,20 @@ class StaticAssetsView(HomeAssistantView): _LOGGER.error("BCS static asset not found: %s", target) return web.Response(status=404) - content_type = "text/plain" + ct = "text/plain" charset = None - if target.suffix == ".js": - content_type = "application/javascript" - charset = "utf-8" + ct, charset = "application/javascript", "utf-8" elif target.suffix == ".html": - content_type = "text/html" - charset = "utf-8" + ct, charset = "text/html", "utf-8" elif target.suffix == ".css": - content_type = "text/css" - charset = "utf-8" + ct, charset = "text/css", "utf-8" elif target.suffix == ".svg": - content_type = "image/svg+xml" + ct = "image/svg+xml" elif target.suffix == ".png": - content_type = "image/png" + ct = "image/png" - resp = web.Response(body=target.read_bytes(), content_type=content_type, charset=charset) + resp = web.Response(body=target.read_bytes(), content_type=ct, charset=charset) resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" resp.headers["Pragma"] = "no-cache" return resp @@ -246,13 +150,11 @@ class BCSApiView(HomeAssistantView): name = "api:bcs" requires_auth = True - def __init__(self, core: Any) -> None: + def __init__(self, core) -> None: self.core = core async def get(self, request: web.Request) -> web.Response: - return web.json_response( - {"ok": True, "version": self.core.version, "repos": self.core.list_repos_public()} - ) + return web.json_response({"ok": True, "version": self.core.version, "repos": self.core.list_repos_public()}) async def post(self, request: web.Request) -> web.Response: data = await request.json() @@ -275,7 +177,7 @@ class BCSCustomRepoView(HomeAssistantView): name = "api:bcs_custom_repo" requires_auth = True - def __init__(self, core: Any) -> None: + def __init__(self, core) -> None: self.core = core async def delete(self, request: web.Request) -> web.Response: @@ -291,7 +193,7 @@ class BCSReadmeView(HomeAssistantView): name = "api:bcs_readme" requires_auth = True - def __init__(self, core: Any) -> None: + def __init__(self, core) -> None: self.core = core async def get(self, request: web.Request) -> web.Response: @@ -299,18 +201,10 @@ class BCSReadmeView(HomeAssistantView): if not repo_id: return web.json_response({"ok": False, "message": "Missing repo_id"}, status=400) - maybe_md = await self.core.fetch_readme_markdown(repo_id) - - md = _extract_text_recursive(maybe_md) + maybe = await self.core.fetch_readme_markdown(repo_id) + md = _extract_text(maybe) if not md or not md.strip(): - t = type(maybe_md).__name__ - return web.json_response( - {"ok": False, "message": f"README not found or unsupported format (got {t})."}, - status=404, - ) + return web.json_response({"ok": False, "message": "README not found."}, status=404) - # Ensure strict JSON string output (avoid accidental objects) - md_str = str(md) - - html = _render_markdown_server_side(md_str) - return web.json_response({"ok": True, "readme": md_str, "html": html}) + html = _render_markdown_server_side(md) + return web.json_response({"ok": True, "readme": md, "html": html})