revert custom_components/bahmcloud_store/views.py aktualisiert
This commit is contained in:
2026-01-15 14:38:54 +00:00
parent 6a0132a25c
commit 8ac0ef103c

View File

@@ -4,20 +4,26 @@ import base64
import logging import logging
from dataclasses import asdict from dataclasses import asdict
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any, TYPE_CHECKING
from aiohttp import web from aiohttp import web
from homeassistant.components.http import HomeAssistantView from homeassistant.components.http import HomeAssistantView
if TYPE_CHECKING:
from .core import BCSCore # typing only
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
def _render_markdown_server_side(md: str) -> str | None: def _render_markdown_server_side(md: str) -> str | None:
"""Render Markdown -> sanitized HTML (server-side)."""
text = (md or "").strip() text = (md or "").strip()
if not text: if not text:
return None return None
html: str | None = None html: str | None = None
# 1) python-markdown
try: try:
import markdown as mdlib # type: ignore import markdown as mdlib # type: ignore
@@ -33,16 +39,44 @@ def _render_markdown_server_side(md: str) -> str | None:
if not html: if not html:
return None return None
# 2) Sanitize via bleach
try: try:
import bleach # type: ignore import bleach # type: ignore
allowed_tags = [ allowed_tags = [
"p", "br", "hr", "div", "span", "blockquote", "pre", "code", "p",
"h1", "h2", "h3", "h4", "h5", "h6", "br",
"ul", "ol", "li", "strong", "em", "b", "i", "u", "s", "hr",
"a", "img", "div",
"table", "thead", "tbody", "tr", "th", "td", "span",
"blockquote",
"pre",
"code",
"h1",
"h2",
"h3",
"h4",
"h5",
"h6",
"ul",
"ol",
"li",
"strong",
"em",
"b",
"i",
"u",
"s",
"a",
"img",
"table",
"thead",
"tbody",
"tr",
"th",
"td",
] ]
allowed_attrs = { allowed_attrs = {
"a": ["href", "title", "target", "rel"], "a": ["href", "title", "target", "rel"],
"img": ["src", "alt", "title"], "img": ["src", "alt", "title"],
@@ -58,51 +92,104 @@ def _render_markdown_server_side(md: str) -> str | None:
protocols=["http", "https", "mailto"], protocols=["http", "https", "mailto"],
strip=True, strip=True,
) )
sanitized = sanitized.replace( sanitized = sanitized.replace(
'<a href="', '<a href="',
'<a rel="noreferrer noopener" target="_blank" href="', '<a rel="noreferrer noopener" target="_blank" href="',
) )
return sanitized return sanitized
except Exception:
except Exception as e:
_LOGGER.debug("bleach sanitize failed/unavailable: %s", e)
return html return html
def _extract_text(obj: Any) -> str | None: _TEXT_KEYS = ("readme", "markdown", "text", "content", "data", "body")
def _maybe_decode_base64(content: str, encoding: Any) -> str | None:
if not isinstance(content, str):
return None
enc = ""
if isinstance(encoding, str):
enc = encoding.strip().lower()
if "base64" not in enc:
return None
try:
raw = base64.b64decode(content.encode("utf-8"), validate=False)
return raw.decode("utf-8", errors="replace")
except Exception:
return None
def _extract_text_recursive(obj: Any, depth: int = 0) -> str | None:
"""
Robust extraction for README markdown.
Handles:
- str / bytes
- dict with:
- {content: "...", encoding: "base64"} (possibly nested)
- {readme: "..."} etc.
- list of dicts (pick first matching)
"""
if obj is None: if obj is None:
return None return None
if isinstance(obj, str):
return obj
if isinstance(obj, bytes): if isinstance(obj, bytes):
try: try:
return obj.decode("utf-8", errors="replace") return obj.decode("utf-8", errors="replace")
except Exception: except Exception:
return None return None
if isinstance(obj, str):
return obj
if depth > 8:
return None
if isinstance(obj, dict): if isinstance(obj, dict):
# gitea style: {"content":"...", "encoding":"base64"} # 1) If it looks like "file content"
content = obj.get("content") content = obj.get("content")
enc = str(obj.get("encoding") or "").lower() encoding = obj.get("encoding")
if isinstance(content, str) and "base64" in enc:
try: # Base64 decode if possible
raw = base64.b64decode(content.encode("utf-8"), validate=False) decoded = _maybe_decode_base64(content, encoding)
return raw.decode("utf-8", errors="replace") if decoded:
except Exception: return decoded
pass
for k in ("readme", "markdown", "text", "content", "body", "data"): # content may already be plain text
if isinstance(content, str) and (not isinstance(encoding, str) or not encoding.strip()):
# Heuristic: treat as markdown if it has typical markdown chars, otherwise still return
return content
# 2) direct text keys (readme/markdown/text/body/data)
for k in _TEXT_KEYS:
v = obj.get(k) v = obj.get(k)
if isinstance(v, str): if isinstance(v, str):
return v return v
# search nested if isinstance(v, bytes):
try:
return v.decode("utf-8", errors="replace")
except Exception:
pass
# 3) Sometimes nested under "file" / "result" / "payload" etc.
for v in obj.values(): for v in obj.values():
t = _extract_text(v) out = _extract_text_recursive(v, depth + 1)
if t: if out:
return t return out
return None return None
if isinstance(obj, list): if isinstance(obj, list):
for it in obj: for item in obj:
t = _extract_text(it) out = _extract_text_recursive(item, depth + 1)
if t: if out:
return t return out
return None return None
return None return None
@@ -113,10 +200,15 @@ class StaticAssetsView(HomeAssistantView):
async def get(self, request: web.Request, path: str) -> web.Response: async def get(self, request: web.Request, path: str) -> web.Response:
base = Path(__file__).resolve().parent / "panel" base = Path(__file__).resolve().parent / "panel"
req = (path or "").lstrip("/") or "index.html" base_resolved = base.resolve()
target = (base / req).resolve()
if not str(target).startswith(str(base.resolve())): req_path = (path or "").lstrip("/")
if req_path == "":
req_path = "index.html"
target = (base / req_path).resolve()
if not str(target).startswith(str(base_resolved)):
return web.Response(status=404) return web.Response(status=404)
if target.is_dir(): if target.is_dir():
@@ -126,20 +218,24 @@ class StaticAssetsView(HomeAssistantView):
_LOGGER.error("BCS static asset not found: %s", target) _LOGGER.error("BCS static asset not found: %s", target)
return web.Response(status=404) return web.Response(status=404)
ct = "text/plain" content_type = "text/plain"
charset = None charset = None
if target.suffix == ".js":
ct, charset = "application/javascript", "utf-8"
elif target.suffix == ".html":
ct, charset = "text/html", "utf-8"
elif target.suffix == ".css":
ct, charset = "text/css", "utf-8"
elif target.suffix == ".svg":
ct = "image/svg+xml"
elif target.suffix == ".png":
ct = "image/png"
resp = web.Response(body=target.read_bytes(), content_type=ct, charset=charset) if target.suffix == ".js":
content_type = "application/javascript"
charset = "utf-8"
elif target.suffix == ".html":
content_type = "text/html"
charset = "utf-8"
elif target.suffix == ".css":
content_type = "text/css"
charset = "utf-8"
elif target.suffix == ".svg":
content_type = "image/svg+xml"
elif target.suffix == ".png":
content_type = "image/png"
resp = web.Response(body=target.read_bytes(), content_type=content_type, charset=charset)
resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
resp.headers["Pragma"] = "no-cache" resp.headers["Pragma"] = "no-cache"
return resp return resp
@@ -150,11 +246,13 @@ class BCSApiView(HomeAssistantView):
name = "api:bcs" name = "api:bcs"
requires_auth = True requires_auth = True
def __init__(self, core) -> None: def __init__(self, core: Any) -> None:
self.core = core self.core = core
async def get(self, request: web.Request) -> web.Response: async def get(self, request: web.Request) -> web.Response:
return web.json_response({"ok": True, "version": self.core.version, "repos": self.core.list_repos_public()}) return web.json_response(
{"ok": True, "version": self.core.version, "repos": self.core.list_repos_public()}
)
async def post(self, request: web.Request) -> web.Response: async def post(self, request: web.Request) -> web.Response:
data = await request.json() data = await request.json()
@@ -177,7 +275,7 @@ class BCSCustomRepoView(HomeAssistantView):
name = "api:bcs_custom_repo" name = "api:bcs_custom_repo"
requires_auth = True requires_auth = True
def __init__(self, core) -> None: def __init__(self, core: Any) -> None:
self.core = core self.core = core
async def delete(self, request: web.Request) -> web.Response: async def delete(self, request: web.Request) -> web.Response:
@@ -193,7 +291,7 @@ class BCSReadmeView(HomeAssistantView):
name = "api:bcs_readme" name = "api:bcs_readme"
requires_auth = True requires_auth = True
def __init__(self, core) -> None: def __init__(self, core: Any) -> None:
self.core = core self.core = core
async def get(self, request: web.Request) -> web.Response: async def get(self, request: web.Request) -> web.Response:
@@ -201,10 +299,18 @@ class BCSReadmeView(HomeAssistantView):
if not repo_id: if not repo_id:
return web.json_response({"ok": False, "message": "Missing repo_id"}, status=400) return web.json_response({"ok": False, "message": "Missing repo_id"}, status=400)
maybe = await self.core.fetch_readme_markdown(repo_id) maybe_md = await self.core.fetch_readme_markdown(repo_id)
md = _extract_text(maybe)
if not md or not md.strip():
return web.json_response({"ok": False, "message": "README not found."}, status=404)
html = _render_markdown_server_side(md) md = _extract_text_recursive(maybe_md)
return web.json_response({"ok": True, "readme": md, "html": html}) if not md or not md.strip():
t = type(maybe_md).__name__
return web.json_response(
{"ok": False, "message": f"README not found or unsupported format (got {t})."},
status=404,
)
# Ensure strict JSON string output (avoid accidental objects)
md_str = str(md)
html = _render_markdown_server_side(md_str)
return web.json_response({"ok": True, "readme": md_str, "html": html})