From 692f0b47daddb4cb65f2b565101d54e297d6660a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Bachmann?= Date: Thu, 15 Jan 2026 12:54:13 +0000 Subject: [PATCH] custom_components/bahmcloud_store/providers.py aktualisiert --- .../bahmcloud_store/providers.py | 576 +++++++----------- 1 file changed, 220 insertions(+), 356 deletions(-) diff --git a/custom_components/bahmcloud_store/providers.py b/custom_components/bahmcloud_store/providers.py index 494f61d..7fc79bf 100644 --- a/custom_components/bahmcloud_store/providers.py +++ b/custom_components/bahmcloud_store/providers.py @@ -1,378 +1,242 @@ from __future__ import annotations import logging -import re -import xml.etree.ElementTree as ET -from dataclasses import dataclass -from urllib.parse import quote_plus, urlparse +from dataclasses import asdict +from pathlib import Path +from typing import Any, TYPE_CHECKING -from homeassistant.core import HomeAssistant -from homeassistant.helpers.aiohttp_client import async_get_clientsession +from aiohttp import web +from homeassistant.components.http import HomeAssistantView + +from .providers import fetch_readme_markdown_from_repo + +if TYPE_CHECKING: + from .core import BCSCore # typing only _LOGGER = logging.getLogger(__name__) -UA = "BahmcloudStore (Home Assistant)" - -@dataclass -class RepoInfo: - owner: str | None = None - repo_name: str | None = None - description: str | None = None - provider: str | None = None - default_branch: str | None = None - - latest_version: str | None = None - latest_version_source: str | None = None # "release" | "tag" | "atom" | None - - -def _normalize_repo_name(name: str | None) -> str | None: - if not name: - return None - n = name.strip() - if n.endswith(".git"): - n = n[:-4] - return n or None - - -def _split_owner_repo(repo_url: str) -> tuple[str | None, str | None]: - u = urlparse(repo_url.rstrip("/")) - parts = [p for p in u.path.strip("/").split("/") if p] - if len(parts) < 2: - return None, None - owner = parts[0].strip() or None - repo = _normalize_repo_name(parts[1]) - return owner, repo - - -def detect_provider(repo_url: str) -> str: - host = urlparse(repo_url).netloc.lower() - if "github.com" in host: - return "github" - if "gitlab" in host: - return "gitlab" - - owner, repo = _split_owner_repo(repo_url) - if owner and repo: - return "gitea" - - return "generic" - - -async def _safe_json(session, url: str, *, headers: dict | None = None, timeout: int = 20): - try: - async with session.get(url, timeout=timeout, headers=headers) as resp: - status = resp.status - if status != 200: - return None, status - return await resp.json(), status - except Exception: - return None, None - - -async def _safe_text(session, url: str, *, headers: dict | None = None, timeout: int = 20): - try: - async with session.get(url, timeout=timeout, headers=headers) as resp: - status = resp.status - if status != 200: - return None, status - return await resp.text(), status - except Exception: - return None, None - - -def _extract_tag_from_github_url(url: str) -> str | None: - m = re.search(r"/releases/tag/([^/?#]+)", url) - if m: - return m.group(1) - m = re.search(r"/tag/([^/?#]+)", url) - if m: - return m.group(1) - return None - - -def _strip_html(s: str) -> str: - # minimal HTML entity cleanup for meta descriptions - out = ( - s.replace("&", "&") - .replace(""", '"') - .replace("'", "'") - .replace("<", "<") - .replace(">", ">") - ) - return re.sub(r"\s+", " ", out).strip() - - -def _extract_meta(html: str, *, prop: str | None = None, name: str | None = None) -> str | None: - # Extract - # or - if prop: - # property="..." content="..." - m = re.search( - r']+property=["\']' + re.escape(prop) + r'["\'][^>]+content=["\']([^"\']+)["\']', - html, - flags=re.IGNORECASE, - ) - if m: - return _strip_html(m.group(1)) - m = re.search( - r']+content=["\']([^"\']+)["\'][^>]+property=["\']' + re.escape(prop) + r'["\']', - html, - flags=re.IGNORECASE, - ) - if m: - return _strip_html(m.group(1)) - - if name: - m = re.search( - r']+name=["\']' + re.escape(name) + r'["\'][^>]+content=["\']([^"\']+)["\']', - html, - flags=re.IGNORECASE, - ) - if m: - return _strip_html(m.group(1)) - m = re.search( - r']+content=["\']([^"\']+)["\'][^>]+name=["\']' + re.escape(name) + r'["\']', - html, - flags=re.IGNORECASE, - ) - if m: - return _strip_html(m.group(1)) - - return None - - -async def _github_description_html(hass: HomeAssistant, owner: str, repo: str) -> str | None: - """ - GitHub API may be rate-limited; fetch public HTML and read meta description. - """ - session = async_get_clientsession(hass) - headers = { - "User-Agent": UA, - "Accept": "text/html,application/xhtml+xml", - } - - html, status = await _safe_text(session, f"https://github.com/{owner}/{repo}", headers=headers) - if not html or status != 200: +def _render_markdown_server_side(md: str) -> str | None: + """Render Markdown -> sanitized HTML (server-side).""" + text = (md or "").strip() + if not text: return None - desc = _extract_meta(html, prop="og:description") - if desc: - return desc - - desc = _extract_meta(html, name="description") - if desc: - return desc - - return None - - -async def _github_latest_version_atom(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]: - session = async_get_clientsession(hass) - headers = {"User-Agent": UA, "Accept": "application/atom+xml,text/xml;q=0.9,*/*;q=0.8"} - - xml_text, _ = await _safe_text(session, f"https://github.com/{owner}/{repo}/releases.atom", headers=headers) - if not xml_text: - return None, None + html: str | None = None try: - root = ET.fromstring(xml_text) - except Exception: - return None, None + import markdown as mdlib # type: ignore - for entry in root.findall(".//{*}entry"): - for link in entry.findall(".//{*}link"): - href = link.attrib.get("href") - if not href: - continue - tag = _extract_tag_from_github_url(href) - if tag: - return tag, "atom" + html = mdlib.markdown( + text, + extensions=["fenced_code", "tables", "sane_lists", "toc"], + output_format="html5", + ) + except Exception as e: + _LOGGER.debug("python-markdown render failed: %s", e) + html = None - return None, None - - -async def _github_latest_version_redirect(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]: - session = async_get_clientsession(hass) - headers = {"User-Agent": UA} - url = f"https://github.com/{owner}/{repo}/releases/latest" - try: - async with session.head(url, allow_redirects=False, timeout=15, headers=headers) as resp: - if resp.status in (301, 302, 303, 307, 308): - loc = resp.headers.get("Location") - if loc: - tag = _extract_tag_from_github_url(loc) - if tag: - return tag, "release" - except Exception: - pass - return None, None - - -async def _github_latest_version_api(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]: - session = async_get_clientsession(hass) - headers = {"Accept": "application/vnd.github+json", "User-Agent": UA} - - data, _ = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}/releases/latest", headers=headers) - if isinstance(data, dict): - tag = data.get("tag_name") or data.get("name") - if isinstance(tag, str) and tag.strip(): - return tag.strip(), "release" - - data, _ = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=1", headers=headers) - if isinstance(data, list) and data: - tag = data[0].get("name") - if isinstance(tag, str) and tag.strip(): - return tag.strip(), "tag" - - return None, None - - -async def _github_latest_version(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]: - tag, src = await _github_latest_version_atom(hass, owner, repo) - if tag: - return tag, src - - tag, src = await _github_latest_version_redirect(hass, owner, repo) - if tag: - return tag, src - - return await _github_latest_version_api(hass, owner, repo) - - -async def _gitea_latest_version(hass: HomeAssistant, base: str, owner: str, repo: str) -> tuple[str | None, str | None]: - session = async_get_clientsession(hass) - - data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}/releases?limit=1") - if isinstance(data, list) and data: - tag = data[0].get("tag_name") or data[0].get("name") - if isinstance(tag, str) and tag.strip(): - return tag.strip(), "release" - - data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}/tags?limit=1") - if isinstance(data, list) and data: - tag = data[0].get("name") - if isinstance(tag, str) and tag.strip(): - return tag.strip(), "tag" - - return None, None - - -async def _gitlab_latest_version(hass: HomeAssistant, base: str, owner: str, repo: str) -> tuple[str | None, str | None]: - session = async_get_clientsession(hass) - headers = {"User-Agent": UA} - project = quote_plus(f"{owner}/{repo}") - - data, _ = await _safe_json( - session, - f"{base}/api/v4/projects/{project}/releases?per_page=1&order_by=released_at&sort=desc", - headers=headers, - ) - if isinstance(data, list) and data: - tag = data[0].get("tag_name") or data[0].get("name") - if isinstance(tag, str) and tag.strip(): - return tag.strip(), "release" - - data, _ = await _safe_json( - session, - f"{base}/api/v4/projects/{project}/repository/tags?per_page=1&order_by=updated&sort=desc", - headers=headers, - ) - if isinstance(data, list) and data: - tag = data[0].get("name") - if isinstance(tag, str) and tag.strip(): - return tag.strip(), "tag" - - return None, None - - -async def fetch_repo_info(hass: HomeAssistant, repo_url: str) -> RepoInfo: - provider = detect_provider(repo_url) - owner, repo = _split_owner_repo(repo_url) - - info = RepoInfo( - owner=owner, - repo_name=repo, - description=None, - provider=provider, - default_branch=None, - latest_version=None, - latest_version_source=None, - ) - - if not owner or not repo: - return info - - session = async_get_clientsession(hass) + if not html: + return None try: - if provider == "github": - # Try API repo details (may be rate-limited) - headers = {"Accept": "application/vnd.github+json", "User-Agent": UA} - data, status = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}", headers=headers) + import bleach # type: ignore - if isinstance(data, dict): - info.description = data.get("description") - info.repo_name = _normalize_repo_name(data.get("name")) or repo - info.default_branch = data.get("default_branch") or "main" - if isinstance(data.get("owner"), dict) and data["owner"].get("login"): - info.owner = data["owner"]["login"] - else: - # If API blocked, still set reasonable defaults - if status == 403: - _LOGGER.debug("GitHub API blocked/rate-limited for repo info %s/%s", owner, repo) - info.default_branch = "main" + allowed_tags = [ + "p", + "br", + "hr", + "div", + "span", + "blockquote", + "pre", + "code", + "h1", + "h2", + "h3", + "h4", + "h5", + "h6", + "ul", + "ol", + "li", + "strong", + "em", + "b", + "i", + "u", + "s", + "a", + "img", + "table", + "thead", + "tbody", + "tr", + "th", + "td", + ] - # If description missing, fetch from GitHub HTML - if not info.description: - desc = await _github_description_html(hass, owner, repo) - if desc: - info.description = desc + allowed_attrs = { + "a": ["href", "title", "target", "rel"], + "img": ["src", "alt", "title"], + "th": ["align"], + "td": ["align"], + "*": ["class"], + } - ver, src = await _github_latest_version(hass, owner, repo) - info.latest_version = ver - info.latest_version_source = src - return info + sanitized = bleach.clean( + html, + tags=allowed_tags, + attributes=allowed_attrs, + protocols=["http", "https", "mailto"], + strip=True, + ) - if provider == "gitlab": - u = urlparse(repo_url.rstrip("/")) - base = f"{u.scheme}://{u.netloc}" - headers = {"User-Agent": UA} - project = quote_plus(f"{owner}/{repo}") - - data, _ = await _safe_json(session, f"{base}/api/v4/projects/{project}", headers=headers) - if isinstance(data, dict): - info.description = data.get("description") - info.repo_name = _normalize_repo_name(data.get("path")) or repo - info.default_branch = data.get("default_branch") or "main" - ns = data.get("namespace") - if isinstance(ns, dict) and ns.get("path"): - info.owner = ns.get("path") - - ver, src = await _gitlab_latest_version(hass, base, owner, repo) - info.latest_version = ver - info.latest_version_source = src - return info - - if provider == "gitea": - u = urlparse(repo_url.rstrip("/")) - base = f"{u.scheme}://{u.netloc}" - - data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}") - if isinstance(data, dict): - info.description = data.get("description") - info.repo_name = _normalize_repo_name(data.get("name")) or repo - info.default_branch = data.get("default_branch") or "main" - if isinstance(data.get("owner"), dict) and data["owner"].get("login"): - info.owner = data["owner"]["login"] - - ver, src = await _gitea_latest_version(hass, base, owner, repo) - info.latest_version = ver - info.latest_version_source = src - return info - - return info + sanitized = sanitized.replace( + ' web.Response: + base = Path(__file__).resolve().parent / "panel" + base_resolved = base.resolve() + + req_path = (path or "").lstrip("/") + if req_path == "": + req_path = "index.html" + + target = (base / req_path).resolve() + + if not str(target).startswith(str(base_resolved)): + return web.Response(status=404) + + if target.is_dir(): + target = (target / "index.html").resolve() + + if not target.exists(): + _LOGGER.error("BCS static asset not found: %s", target) + return web.Response(status=404) + + content_type = "text/plain" + charset = None + + if target.suffix == ".js": + content_type = "application/javascript" + charset = "utf-8" + elif target.suffix == ".html": + content_type = "text/html" + charset = "utf-8" + elif target.suffix == ".css": + content_type = "text/css" + charset = "utf-8" + elif target.suffix == ".svg": + content_type = "image/svg+xml" + elif target.suffix == ".png": + content_type = "image/png" + + resp = web.Response(body=target.read_bytes(), content_type=content_type, charset=charset) + resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" + resp.headers["Pragma"] = "no-cache" + return resp + + +class BCSApiView(HomeAssistantView): + url = "/api/bcs" + name = "api:bcs" + requires_auth = True + + def __init__(self, core: Any) -> None: + self.core = core + + async def get(self, request: web.Request) -> web.Response: + return web.json_response( + {"ok": True, "version": self.core.version, "repos": self.core.list_repos_public()} + ) + + async def post(self, request: web.Request) -> web.Response: + data = await request.json() + op = data.get("op") + + if op == "add_custom_repo": + url = str(data.get("url") or "").strip() + name = data.get("name") + name = str(name).strip() if name else None + if not url: + return web.json_response({"ok": False, "message": "Missing url"}, status=400) + repo = await self.core.add_custom_repo(url=url, name=name) + return web.json_response({"ok": True, "repo": asdict(repo)}) + + return web.json_response({"ok": False, "message": "Unknown operation"}, status=400) + + +class BCSCustomRepoView(HomeAssistantView): + url = "/api/bcs/custom_repo" + name = "api:bcs_custom_repo" + requires_auth = True + + def __init__(self, core: Any) -> None: + self.core = core + + async def delete(self, request: web.Request) -> web.Response: + repo_id = request.query.get("id") + if not repo_id: + return web.json_response({"ok": False, "message": "Missing id"}, status=400) + await self.core.remove_custom_repo(repo_id) + return web.json_response({"ok": True}) + + +class BCSReadmeView(HomeAssistantView): + url = "/api/bcs/readme" + name = "api:bcs_readme" + requires_auth = True + + def __init__(self, core: Any) -> None: + self.core = core + + async def get(self, request: web.Request) -> web.Response: + repo_id = request.query.get("repo_id") + if not repo_id: + return web.json_response({"ok": False, "message": "Missing repo_id"}, status=400) + + repos = self.core.list_repos_public() + repo = next((r for r in repos if str(r.get("id", "")) == str(repo_id)), None) + if not repo: + return web.json_response({"ok": False, "message": "Repository not found."}, status=404) + + repo_url = repo.get("url") + provider = repo.get("provider") + default_branch = repo.get("default_branch") + + if not isinstance(repo_url, str) or not repo_url.strip(): + return web.json_response({"ok": False, "message": "Repository URL missing."}, status=404) + + md = await fetch_readme_markdown_from_repo( + self.core.hass, + repo_url=repo_url, + provider=provider if isinstance(provider, str) else None, + default_branch=default_branch if isinstance(default_branch, str) else None, + ) + + if not md or not md.strip(): + return web.json_response( + { + "ok": False, + "message": "README not found (raw endpoint returned 404).", + }, + status=404, + ) + + html = _render_markdown_server_side(md) + return web.json_response({"ok": True, "readme": md, "html": html})