custom_components/bahmcloud_store/views.py aktualisiert
This commit is contained in:
@@ -1,292 +1,85 @@
|
||||
"""HTTP views for Bahmcloud Store (BCS).
|
||||
|
||||
This module exposes the internal store state via a small REST API and serves
|
||||
static panel assets.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import logging
|
||||
from dataclasses import asdict
|
||||
from pathlib import Path
|
||||
from typing import Any, TYPE_CHECKING
|
||||
from typing import Any
|
||||
|
||||
from aiohttp import web
|
||||
from homeassistant.components.http import HomeAssistantView
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .core import BCSCore # typing only
|
||||
from homeassistant.components.http import HomeAssistantView
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
from .const import DOMAIN
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _render_markdown_server_side(md: str) -> str | None:
|
||||
text = (md or "").strip()
|
||||
if not text:
|
||||
return None
|
||||
|
||||
html: str | None = None
|
||||
|
||||
try:
|
||||
import markdown as mdlib # type: ignore
|
||||
|
||||
html = mdlib.markdown(
|
||||
text,
|
||||
extensions=["fenced_code", "tables", "sane_lists", "toc"],
|
||||
output_format="html5",
|
||||
)
|
||||
except Exception as e:
|
||||
_LOGGER.debug("python-markdown render failed: %s", e)
|
||||
html = None
|
||||
|
||||
if not html:
|
||||
return None
|
||||
|
||||
try:
|
||||
import bleach # type: ignore
|
||||
|
||||
allowed_tags = [
|
||||
"p",
|
||||
"br",
|
||||
"hr",
|
||||
"div",
|
||||
"span",
|
||||
"blockquote",
|
||||
"pre",
|
||||
"code",
|
||||
"h1",
|
||||
"h2",
|
||||
"h3",
|
||||
"h4",
|
||||
"h5",
|
||||
"h6",
|
||||
"ul",
|
||||
"ol",
|
||||
"li",
|
||||
"strong",
|
||||
"em",
|
||||
"b",
|
||||
"i",
|
||||
"u",
|
||||
"s",
|
||||
"a",
|
||||
"img",
|
||||
"table",
|
||||
"thead",
|
||||
"tbody",
|
||||
"tr",
|
||||
"th",
|
||||
"td",
|
||||
]
|
||||
|
||||
allowed_attrs = {
|
||||
"a": ["href", "title", "target", "rel"],
|
||||
"img": ["src", "alt", "title"],
|
||||
"th": ["align"],
|
||||
"td": ["align"],
|
||||
"*": ["class"],
|
||||
}
|
||||
|
||||
sanitized = bleach.clean(
|
||||
html,
|
||||
tags=allowed_tags,
|
||||
attributes=allowed_attrs,
|
||||
protocols=["http", "https", "mailto"],
|
||||
strip=True,
|
||||
)
|
||||
|
||||
sanitized = sanitized.replace(
|
||||
'<a href="',
|
||||
'<a rel="noreferrer noopener" target="_blank" href="',
|
||||
)
|
||||
return sanitized
|
||||
|
||||
except Exception as e:
|
||||
_LOGGER.debug("bleach sanitize failed/unavailable: %s", e)
|
||||
|
||||
return html
|
||||
def _get_core(hass: HomeAssistant):
|
||||
data = hass.data.get(DOMAIN)
|
||||
if not data or "core" not in data:
|
||||
raise RuntimeError("BCS core not initialized")
|
||||
return data["core"]
|
||||
|
||||
|
||||
_TEXT_KEYS = ("readme", "markdown", "text", "content", "data", "body")
|
||||
class BcsBaseView(HomeAssistantView):
|
||||
"""Base view with shared helpers."""
|
||||
|
||||
|
||||
def _maybe_decode_base64(content: str, encoding: Any) -> str | None:
|
||||
if not isinstance(content, str):
|
||||
return None
|
||||
enc = ""
|
||||
if isinstance(encoding, str):
|
||||
enc = encoding.strip().lower()
|
||||
if "base64" not in enc:
|
||||
return None
|
||||
try:
|
||||
raw = base64.b64decode(content.encode("utf-8"), validate=False)
|
||||
return raw.decode("utf-8", errors="replace")
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _extract_text_recursive(obj: Any, depth: int = 0) -> str | None:
|
||||
if obj is None:
|
||||
return None
|
||||
|
||||
if isinstance(obj, bytes):
|
||||
try:
|
||||
return obj.decode("utf-8", errors="replace")
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
if isinstance(obj, str):
|
||||
return obj
|
||||
|
||||
if depth > 8:
|
||||
return None
|
||||
|
||||
if isinstance(obj, dict):
|
||||
content = obj.get("content")
|
||||
encoding = obj.get("encoding")
|
||||
|
||||
decoded = _maybe_decode_base64(content, encoding)
|
||||
if decoded:
|
||||
return decoded
|
||||
|
||||
if isinstance(content, str) and (not isinstance(encoding, str) or not encoding.strip()):
|
||||
return content
|
||||
|
||||
for k in _TEXT_KEYS:
|
||||
v = obj.get(k)
|
||||
if isinstance(v, str):
|
||||
return v
|
||||
if isinstance(v, bytes):
|
||||
try:
|
||||
return v.decode("utf-8", errors="replace")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for v in obj.values():
|
||||
out = _extract_text_recursive(v, depth + 1)
|
||||
if out:
|
||||
return out
|
||||
|
||||
return None
|
||||
|
||||
if isinstance(obj, list):
|
||||
for item in obj:
|
||||
out = _extract_text_recursive(item, depth + 1)
|
||||
if out:
|
||||
return out
|
||||
return None
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class StaticAssetsView(HomeAssistantView):
|
||||
url = "/api/bahmcloud_store_static/{path:.*}"
|
||||
name = "api:bahmcloud_store_static"
|
||||
requires_auth = False
|
||||
|
||||
async def get(self, request: web.Request, path: str) -> web.StreamResponse:
|
||||
base = Path(__file__).resolve().parent / "panel"
|
||||
base_resolved = base.resolve()
|
||||
|
||||
req_path = (path or "").lstrip("/")
|
||||
if req_path == "":
|
||||
req_path = "index.html"
|
||||
|
||||
target = (base / req_path).resolve()
|
||||
|
||||
if not str(target).startswith(str(base_resolved)):
|
||||
return web.Response(status=404)
|
||||
|
||||
if target.is_dir():
|
||||
target = (target / "index.html").resolve()
|
||||
|
||||
if not target.exists():
|
||||
_LOGGER.error("BCS static asset not found: %s", target)
|
||||
return web.Response(status=404)
|
||||
|
||||
resp = web.FileResponse(path=target)
|
||||
resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
|
||||
resp.headers["Pragma"] = "no-cache"
|
||||
return resp
|
||||
|
||||
|
||||
class BCSApiView(HomeAssistantView):
|
||||
url = "/api/bcs"
|
||||
name = "api:bcs"
|
||||
requires_auth = True
|
||||
|
||||
def __init__(self, core: Any) -> None:
|
||||
self.core: BCSCore = core
|
||||
def json(self, payload: dict[str, Any], status: int = 200) -> web.Response:
|
||||
return web.json_response(payload, status=status)
|
||||
|
||||
async def get(self, request: web.Request) -> web.Response:
|
||||
return web.json_response(
|
||||
{"ok": True, "version": self.core.version, "repos": self.core.list_repos_public()}
|
||||
)
|
||||
def error(self, message: str, status: int = 400) -> web.Response:
|
||||
return self.json({"ok": False, "error": message}, status=status)
|
||||
|
||||
|
||||
class BcsInstallDryRunView(BcsBaseView):
|
||||
"""POST /api/bcs/install?repo_id=... (dry-run)."""
|
||||
|
||||
url = "/api/bcs/install"
|
||||
name = "api:bcs:install"
|
||||
|
||||
async def post(self, request: web.Request) -> web.Response:
|
||||
action = request.query.get("action")
|
||||
if action == "refresh":
|
||||
_LOGGER.info("BCS manual refresh triggered via API")
|
||||
try:
|
||||
await self.core.full_refresh(source="manual")
|
||||
return web.json_response({"ok": True})
|
||||
except Exception as e:
|
||||
_LOGGER.error("BCS manual refresh failed: %s", e)
|
||||
return web.json_response({"ok": False, "message": "Refresh failed"}, status=500)
|
||||
hass: HomeAssistant = request.app["hass"]
|
||||
core = _get_core(hass)
|
||||
|
||||
try:
|
||||
data = await request.json()
|
||||
except Exception:
|
||||
data = {}
|
||||
|
||||
op = data.get("op")
|
||||
|
||||
if op == "add_custom_repo":
|
||||
url = str(data.get("url") or "").strip()
|
||||
name = data.get("name")
|
||||
name = str(name).strip() if name else None
|
||||
if not url:
|
||||
return web.json_response({"ok": False, "message": "Missing url"}, status=400)
|
||||
repo = await self.core.add_custom_repo(url=url, name=name)
|
||||
return web.json_response({"ok": True, "repo": asdict(repo)})
|
||||
|
||||
return web.json_response({"ok": False, "message": "Unknown operation"}, status=400)
|
||||
|
||||
|
||||
class BCSCustomRepoView(HomeAssistantView):
|
||||
url = "/api/bcs/custom_repo"
|
||||
name = "api:bcs_custom_repo"
|
||||
requires_auth = True
|
||||
|
||||
def __init__(self, core: Any) -> None:
|
||||
self.core: BCSCore = core
|
||||
|
||||
async def delete(self, request: web.Request) -> web.Response:
|
||||
repo_id = request.query.get("id")
|
||||
if not repo_id:
|
||||
return web.json_response({"ok": False, "message": "Missing id"}, status=400)
|
||||
await self.core.remove_custom_repo(repo_id)
|
||||
return web.json_response({"ok": True})
|
||||
|
||||
|
||||
class BCSReadmeView(HomeAssistantView):
|
||||
url = "/api/bcs/readme"
|
||||
name = "api:bcs_readme"
|
||||
requires_auth = True
|
||||
|
||||
def __init__(self, core: Any) -> None:
|
||||
self.core: BCSCore = core
|
||||
|
||||
async def get(self, request: web.Request) -> web.Response:
|
||||
repo_id = request.query.get("repo_id")
|
||||
if not repo_id:
|
||||
return web.json_response({"ok": False, "message": "Missing repo_id"}, status=400)
|
||||
return self.error("Missing required query parameter: repo_id", status=400)
|
||||
|
||||
maybe_md = await self.core.fetch_readme_markdown(repo_id)
|
||||
_LOGGER.info("BCS install dry-run requested (repo_id=%s)", repo_id)
|
||||
|
||||
md = _extract_text_recursive(maybe_md)
|
||||
if not md or not md.strip():
|
||||
t = type(maybe_md).__name__
|
||||
return web.json_response(
|
||||
{"ok": False, "message": f"README not found or unsupported format (got {t})."},
|
||||
status=404,
|
||||
try:
|
||||
result = await core.async_install_dry_run(repo_id)
|
||||
except core.RepoNotFoundError:
|
||||
_LOGGER.warning("BCS install dry-run failed: repo not found (repo_id=%s)", repo_id)
|
||||
return self.error("Repository not found", status=404)
|
||||
except core.DomainResolutionError as err:
|
||||
_LOGGER.warning(
|
||||
"BCS install dry-run failed: domain resolution error (repo_id=%s): %s",
|
||||
repo_id,
|
||||
err,
|
||||
)
|
||||
return self.error(str(err), status=422)
|
||||
except Exception as err: # pylint: disable=broad-exception-caught
|
||||
_LOGGER.exception("BCS install dry-run failed (repo_id=%s): %s", repo_id, err)
|
||||
return self.error("Internal error", status=500)
|
||||
|
||||
return self.json(
|
||||
{
|
||||
"ok": True,
|
||||
"action": "install",
|
||||
"domain": result["domain"],
|
||||
},
|
||||
status=200,
|
||||
)
|
||||
|
||||
md_str = str(md)
|
||||
html = _render_markdown_server_side(md_str)
|
||||
return web.json_response({"ok": True, "readme": md_str, "html": html})
|
||||
|
||||
async def async_register_views(hass: HomeAssistant) -> None:
|
||||
"""Register HTTP API views."""
|
||||
hass.http.register_view(BcsInstallDryRunView())
|
||||
|
||||
Reference in New Issue
Block a user