diff --git a/custom_components/bahmcloud_store/providers.py b/custom_components/bahmcloud_store/providers.py index 7fc79bf..0ae9d13 100644 --- a/custom_components/bahmcloud_store/providers.py +++ b/custom_components/bahmcloud_store/providers.py @@ -1,242 +1,446 @@ from __future__ import annotations import logging -from dataclasses import asdict -from pathlib import Path -from typing import Any, TYPE_CHECKING +import re +import xml.etree.ElementTree as ET +from dataclasses import dataclass +from urllib.parse import quote_plus, urlparse -from aiohttp import web -from homeassistant.components.http import HomeAssistantView - -from .providers import fetch_readme_markdown_from_repo - -if TYPE_CHECKING: - from .core import BCSCore # typing only +from homeassistant.core import HomeAssistant +from homeassistant.helpers.aiohttp_client import async_get_clientsession _LOGGER = logging.getLogger(__name__) +UA = "BahmcloudStore (Home Assistant)" -def _render_markdown_server_side(md: str) -> str | None: - """Render Markdown -> sanitized HTML (server-side).""" - text = (md or "").strip() - if not text: + +@dataclass +class RepoInfo: + owner: str | None = None + repo_name: str | None = None + description: str | None = None + provider: str | None = None + default_branch: str | None = None + + latest_version: str | None = None + latest_version_source: str | None = None # "release" | "tag" | "atom" | None + + +def _normalize_repo_name(name: str | None) -> str | None: + if not name: return None + n = name.strip() + if n.endswith(".git"): + n = n[:-4] + return n or None - html: str | None = None +def _split_owner_repo(repo_url: str) -> tuple[str | None, str | None]: + u = urlparse(repo_url.rstrip("/")) + parts = [p for p in u.path.strip("/").split("/") if p] + if len(parts) < 2: + return None, None + owner = parts[0].strip() or None + repo = _normalize_repo_name(parts[1]) + return owner, repo + + +def detect_provider(repo_url: str) -> str: + host = urlparse(repo_url).netloc.lower() + if "github.com" in host: + return "github" + if "gitlab" in host: + return "gitlab" + + owner, repo = _split_owner_repo(repo_url) + if owner and repo: + return "gitea" + + return "generic" + + +async def _safe_json(session, url: str, *, headers: dict | None = None, timeout: int = 20): try: - import markdown as mdlib # type: ignore + async with session.get(url, timeout=timeout, headers=headers) as resp: + status = resp.status + if status != 200: + return None, status + return await resp.json(), status + except Exception: + return None, None - html = mdlib.markdown( - text, - extensions=["fenced_code", "tables", "sane_lists", "toc"], - output_format="html5", - ) - except Exception as e: - _LOGGER.debug("python-markdown render failed: %s", e) - html = None - - if not html: - return None +async def _safe_text(session, url: str, *, headers: dict | None = None, timeout: int = 20): try: - import bleach # type: ignore + async with session.get(url, timeout=timeout, headers=headers) as resp: + status = resp.status + if status != 200: + return None, status + return await resp.text(), status + except Exception: + return None, None - allowed_tags = [ - "p", - "br", - "hr", - "div", - "span", - "blockquote", - "pre", - "code", - "h1", - "h2", - "h3", - "h4", - "h5", - "h6", - "ul", - "ol", - "li", - "strong", - "em", - "b", - "i", - "u", - "s", - "a", - "img", - "table", - "thead", - "tbody", - "tr", - "th", - "td", - ] - allowed_attrs = { - "a": ["href", "title", "target", "rel"], - "img": ["src", "alt", "title"], - "th": ["align"], - "td": ["align"], - "*": ["class"], - } +def _extract_tag_from_github_url(url: str) -> str | None: + m = re.search(r"/releases/tag/([^/?#]+)", url) + if m: + return m.group(1) + m = re.search(r"/tag/([^/?#]+)", url) + if m: + return m.group(1) + return None - sanitized = bleach.clean( + +def _strip_html(s: str) -> str: + out = ( + s.replace("&", "&") + .replace(""", '"') + .replace("'", "'") + .replace("<", "<") + .replace(">", ">") + ) + return re.sub(r"\s+", " ", out).strip() + + +def _extract_meta(html: str, *, prop: str | None = None, name: str | None = None) -> str | None: + if prop: + m = re.search( + r']+property=["\']' + re.escape(prop) + r'["\'][^>]+content=["\']([^"\']+)["\']', html, - tags=allowed_tags, - attributes=allowed_attrs, - protocols=["http", "https", "mailto"], - strip=True, + flags=re.IGNORECASE, ) + if m: + return _strip_html(m.group(1)) + m = re.search( + r']+content=["\']([^"\']+)["\'][^>]+property=["\']' + re.escape(prop) + r'["\']', + html, + flags=re.IGNORECASE, + ) + if m: + return _strip_html(m.group(1)) - sanitized = sanitized.replace( - ']+content=["\']([^"\']+)["\']', + html, + flags=re.IGNORECASE, ) - return sanitized + if m: + return _strip_html(m.group(1)) + m = re.search( + r']+content=["\']([^"\']+)["\'][^>]+name=["\']' + re.escape(name) + r'["\']', + html, + flags=re.IGNORECASE, + ) + if m: + return _strip_html(m.group(1)) + + return None + + +async def _github_description_html(hass: HomeAssistant, owner: str, repo: str) -> str | None: + session = async_get_clientsession(hass) + headers = {"User-Agent": UA, "Accept": "text/html,application/xhtml+xml"} + html, status = await _safe_text(session, f"https://github.com/{owner}/{repo}", headers=headers) + if not html or status != 200: + return None + + desc = _extract_meta(html, prop="og:description") or _extract_meta(html, name="description") + return desc + + +async def _github_latest_version_atom(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]: + session = async_get_clientsession(hass) + headers = {"User-Agent": UA, "Accept": "application/atom+xml,text/xml;q=0.9,*/*;q=0.8"} + + xml_text, _ = await _safe_text(session, f"https://github.com/{owner}/{repo}/releases.atom", headers=headers) + if not xml_text: + return None, None + + try: + root = ET.fromstring(xml_text) + except Exception: + return None, None + + for entry in root.findall(".//{*}entry"): + for link in entry.findall(".//{*}link"): + href = link.attrib.get("href") + if not href: + continue + tag = _extract_tag_from_github_url(href) + if tag: + return tag, "atom" + + return None, None + + +async def _github_latest_version_redirect(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]: + session = async_get_clientsession(hass) + headers = {"User-Agent": UA} + url = f"https://github.com/{owner}/{repo}/releases/latest" + try: + async with session.head(url, allow_redirects=False, timeout=15, headers=headers) as resp: + if resp.status in (301, 302, 303, 307, 308): + loc = resp.headers.get("Location") + if loc: + tag = _extract_tag_from_github_url(loc) + if tag: + return tag, "release" + except Exception: + pass + return None, None + + +async def _github_latest_version_api(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]: + session = async_get_clientsession(hass) + headers = {"Accept": "application/vnd.github+json", "User-Agent": UA} + + data, _ = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}/releases/latest", headers=headers) + if isinstance(data, dict): + tag = data.get("tag_name") or data.get("name") + if isinstance(tag, str) and tag.strip(): + return tag.strip(), "release" + + data, _ = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=1", headers=headers) + if isinstance(data, list) and data: + tag = data[0].get("name") + if isinstance(tag, str) and tag.strip(): + return tag.strip(), "tag" + + return None, None + + +async def _github_latest_version(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]: + tag, src = await _github_latest_version_atom(hass, owner, repo) + if tag: + return tag, src + tag, src = await _github_latest_version_redirect(hass, owner, repo) + if tag: + return tag, src + return await _github_latest_version_api(hass, owner, repo) + + +async def _gitea_latest_version(hass: HomeAssistant, base: str, owner: str, repo: str) -> tuple[str | None, str | None]: + session = async_get_clientsession(hass) + + data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}/releases?limit=1") + if isinstance(data, list) and data: + tag = data[0].get("tag_name") or data[0].get("name") + if isinstance(tag, str) and tag.strip(): + return tag.strip(), "release" + + data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}/tags?limit=1") + if isinstance(data, list) and data: + tag = data[0].get("name") + if isinstance(tag, str) and tag.strip(): + return tag.strip(), "tag" + + return None, None + + +async def _gitlab_latest_version(hass: HomeAssistant, base: str, owner: str, repo: str) -> tuple[str | None, str | None]: + session = async_get_clientsession(hass) + headers = {"User-Agent": UA} + project = quote_plus(f"{owner}/{repo}") + + data, _ = await _safe_json( + session, + f"{base}/api/v4/projects/{project}/releases?per_page=1&order_by=released_at&sort=desc", + headers=headers, + ) + if isinstance(data, list) and data: + tag = data[0].get("tag_name") or data[0].get("name") + if isinstance(tag, str) and tag.strip(): + return tag.strip(), "release" + + data, _ = await _safe_json( + session, + f"{base}/api/v4/projects/{project}/repository/tags?per_page=1&order_by=updated&sort=desc", + headers=headers, + ) + if isinstance(data, list) and data: + tag = data[0].get("name") + if isinstance(tag, str) and tag.strip(): + return tag.strip(), "tag" + + return None, None + + +# ------------------------- +# README fetching (RAW URLs) +# ------------------------- + +async def fetch_readme_markdown_from_repo( + hass: HomeAssistant, + repo_url: str, + provider: str | None, + default_branch: str | None, +) -> str | None: + """ + Fetch README as plain Markdown text using provider RAW endpoints (no API). + + Tries common filename variants: + - README.md + - Readme.md + - README.MD + - README + """ + session = async_get_clientsession(hass) + headers = {"User-Agent": UA, "Accept": "text/plain,text/markdown,text/*;q=0.9,*/*;q=0.8"} + + provider = (provider or detect_provider(repo_url)).lower() + branch = (default_branch or "main").strip() or "main" + + u = urlparse(repo_url.rstrip("/")) + base = f"{u.scheme}://{u.netloc}".rstrip("/") + path = u.path.strip("/") + parts = [p for p in path.split("/") if p] + + if len(parts) < 2: + return None + + owner = parts[0] + repo = _normalize_repo_name(parts[1]) or parts[1] + + candidates = ["README.md", "Readme.md", "README.MD", "README"] + + # GitHub RAW + if provider == "github": + for fn in candidates: + raw = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{fn}" + txt, status = await _safe_text(session, raw, headers=headers) + if txt and status == 200: + return txt + # also try master if main failed + if branch != "master": + for fn in candidates: + raw = f"https://raw.githubusercontent.com/{owner}/{repo}/master/{fn}" + txt, status = await _safe_text(session, raw, headers=headers) + if txt and status == 200: + return txt + return None + + # GitLab RAW: /-/raw// + if provider == "gitlab": + for fn in candidates: + raw = f"{base}/{owner}/{repo}/-/raw/{branch}/{fn}" + txt, status = await _safe_text(session, raw, headers=headers) + if txt and status == 200: + return txt + if branch != "master": + for fn in candidates: + raw = f"{base}/{owner}/{repo}/-/raw/master/{fn}" + txt, status = await _safe_text(session, raw, headers=headers) + if txt and status == 200: + return txt + return None + + # Gitea RAW: /raw/branch// + if provider == "gitea": + for fn in candidates: + raw = f"{base}/{owner}/{repo}/raw/branch/{branch}/{fn}" + txt, status = await _safe_text(session, raw, headers=headers) + if txt and status == 200: + return txt + if branch != "master": + for fn in candidates: + raw = f"{base}/{owner}/{repo}/raw/branch/master/{fn}" + txt, status = await _safe_text(session, raw, headers=headers) + if txt and status == 200: + return txt + return None + + # Generic fallback (best effort): try same as GitLab style + for fn in candidates: + raw = f"{base}/{owner}/{repo}/-/raw/{branch}/{fn}" + txt, status = await _safe_text(session, raw, headers=headers) + if txt and status == 200: + return txt + + return None + + +async def fetch_repo_info(hass: HomeAssistant, repo_url: str) -> RepoInfo: + provider = detect_provider(repo_url) + owner, repo = _split_owner_repo(repo_url) + + info = RepoInfo( + owner=owner, + repo_name=repo, + description=None, + provider=provider, + default_branch=None, + latest_version=None, + latest_version_source=None, + ) + + if not owner or not repo: + return info + + session = async_get_clientsession(hass) + + try: + if provider == "github": + headers = {"Accept": "application/vnd.github+json", "User-Agent": UA} + data, _status = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}", headers=headers) + if isinstance(data, dict): + info.description = data.get("description") + info.repo_name = _normalize_repo_name(data.get("name")) or repo + info.default_branch = data.get("default_branch") or "main" + if isinstance(data.get("owner"), dict) and data["owner"].get("login"): + info.owner = data["owner"]["login"] + else: + info.default_branch = "main" + + if not info.description: + desc = await _github_description_html(hass, owner, repo) + if desc: + info.description = desc + + ver, src = await _github_latest_version(hass, owner, repo) + info.latest_version = ver + info.latest_version_source = src + return info + + if provider == "gitlab": + u = urlparse(repo_url.rstrip("/")) + base = f"{u.scheme}://{u.netloc}" + headers = {"User-Agent": UA} + project = quote_plus(f"{owner}/{repo}") + + data, _ = await _safe_json(session, f"{base}/api/v4/projects/{project}", headers=headers) + if isinstance(data, dict): + info.description = data.get("description") + info.repo_name = _normalize_repo_name(data.get("path")) or repo + info.default_branch = data.get("default_branch") or "main" + ns = data.get("namespace") + if isinstance(ns, dict) and ns.get("path"): + info.owner = ns.get("path") + + ver, src = await _gitlab_latest_version(hass, base, owner, repo) + info.latest_version = ver + info.latest_version_source = src + return info + + if provider == "gitea": + u = urlparse(repo_url.rstrip("/")) + base = f"{u.scheme}://{u.netloc}" + + data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}") + if isinstance(data, dict): + info.description = data.get("description") + info.repo_name = _normalize_repo_name(data.get("name")) or repo + info.default_branch = data.get("default_branch") or "main" + if isinstance(data.get("owner"), dict) and data["owner"].get("login"): + info.owner = data["owner"]["login"] + + ver, src = await _gitea_latest_version(hass, base, owner, repo) + info.latest_version = ver + info.latest_version_source = src + return info + + return info except Exception as e: - _LOGGER.debug("bleach sanitize failed/unavailable: %s", e) - - return html - - -class StaticAssetsView(HomeAssistantView): - url = "/api/bahmcloud_store_static/{path:.*}" - name = "api:bahmcloud_store_static" - requires_auth = False - - async def get(self, request: web.Request, path: str) -> web.Response: - base = Path(__file__).resolve().parent / "panel" - base_resolved = base.resolve() - - req_path = (path or "").lstrip("/") - if req_path == "": - req_path = "index.html" - - target = (base / req_path).resolve() - - if not str(target).startswith(str(base_resolved)): - return web.Response(status=404) - - if target.is_dir(): - target = (target / "index.html").resolve() - - if not target.exists(): - _LOGGER.error("BCS static asset not found: %s", target) - return web.Response(status=404) - - content_type = "text/plain" - charset = None - - if target.suffix == ".js": - content_type = "application/javascript" - charset = "utf-8" - elif target.suffix == ".html": - content_type = "text/html" - charset = "utf-8" - elif target.suffix == ".css": - content_type = "text/css" - charset = "utf-8" - elif target.suffix == ".svg": - content_type = "image/svg+xml" - elif target.suffix == ".png": - content_type = "image/png" - - resp = web.Response(body=target.read_bytes(), content_type=content_type, charset=charset) - resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0" - resp.headers["Pragma"] = "no-cache" - return resp - - -class BCSApiView(HomeAssistantView): - url = "/api/bcs" - name = "api:bcs" - requires_auth = True - - def __init__(self, core: Any) -> None: - self.core = core - - async def get(self, request: web.Request) -> web.Response: - return web.json_response( - {"ok": True, "version": self.core.version, "repos": self.core.list_repos_public()} - ) - - async def post(self, request: web.Request) -> web.Response: - data = await request.json() - op = data.get("op") - - if op == "add_custom_repo": - url = str(data.get("url") or "").strip() - name = data.get("name") - name = str(name).strip() if name else None - if not url: - return web.json_response({"ok": False, "message": "Missing url"}, status=400) - repo = await self.core.add_custom_repo(url=url, name=name) - return web.json_response({"ok": True, "repo": asdict(repo)}) - - return web.json_response({"ok": False, "message": "Unknown operation"}, status=400) - - -class BCSCustomRepoView(HomeAssistantView): - url = "/api/bcs/custom_repo" - name = "api:bcs_custom_repo" - requires_auth = True - - def __init__(self, core: Any) -> None: - self.core = core - - async def delete(self, request: web.Request) -> web.Response: - repo_id = request.query.get("id") - if not repo_id: - return web.json_response({"ok": False, "message": "Missing id"}, status=400) - await self.core.remove_custom_repo(repo_id) - return web.json_response({"ok": True}) - - -class BCSReadmeView(HomeAssistantView): - url = "/api/bcs/readme" - name = "api:bcs_readme" - requires_auth = True - - def __init__(self, core: Any) -> None: - self.core = core - - async def get(self, request: web.Request) -> web.Response: - repo_id = request.query.get("repo_id") - if not repo_id: - return web.json_response({"ok": False, "message": "Missing repo_id"}, status=400) - - repos = self.core.list_repos_public() - repo = next((r for r in repos if str(r.get("id", "")) == str(repo_id)), None) - if not repo: - return web.json_response({"ok": False, "message": "Repository not found."}, status=404) - - repo_url = repo.get("url") - provider = repo.get("provider") - default_branch = repo.get("default_branch") - - if not isinstance(repo_url, str) or not repo_url.strip(): - return web.json_response({"ok": False, "message": "Repository URL missing."}, status=404) - - md = await fetch_readme_markdown_from_repo( - self.core.hass, - repo_url=repo_url, - provider=provider if isinstance(provider, str) else None, - default_branch=default_branch if isinstance(default_branch, str) else None, - ) - - if not md or not md.strip(): - return web.json_response( - { - "ok": False, - "message": "README not found (raw endpoint returned 404).", - }, - status=404, - ) - - html = _render_markdown_server_side(md) - return web.json_response({"ok": True, "readme": md, "html": html}) + _LOGGER.debug("Provider fetch failed for %s: %s", repo_url, e) + return info