diff --git a/custom_components/bahmcloud_store/providers.py b/custom_components/bahmcloud_store/providers.py
index 494f61d..7fc79bf 100644
--- a/custom_components/bahmcloud_store/providers.py
+++ b/custom_components/bahmcloud_store/providers.py
@@ -1,378 +1,242 @@
from __future__ import annotations
import logging
-import re
-import xml.etree.ElementTree as ET
-from dataclasses import dataclass
-from urllib.parse import quote_plus, urlparse
+from dataclasses import asdict
+from pathlib import Path
+from typing import Any, TYPE_CHECKING
-from homeassistant.core import HomeAssistant
-from homeassistant.helpers.aiohttp_client import async_get_clientsession
+from aiohttp import web
+from homeassistant.components.http import HomeAssistantView
+
+from .providers import fetch_readme_markdown_from_repo
+
+if TYPE_CHECKING:
+ from .core import BCSCore # typing only
_LOGGER = logging.getLogger(__name__)
-UA = "BahmcloudStore (Home Assistant)"
-
-@dataclass
-class RepoInfo:
- owner: str | None = None
- repo_name: str | None = None
- description: str | None = None
- provider: str | None = None
- default_branch: str | None = None
-
- latest_version: str | None = None
- latest_version_source: str | None = None # "release" | "tag" | "atom" | None
-
-
-def _normalize_repo_name(name: str | None) -> str | None:
- if not name:
- return None
- n = name.strip()
- if n.endswith(".git"):
- n = n[:-4]
- return n or None
-
-
-def _split_owner_repo(repo_url: str) -> tuple[str | None, str | None]:
- u = urlparse(repo_url.rstrip("/"))
- parts = [p for p in u.path.strip("/").split("/") if p]
- if len(parts) < 2:
- return None, None
- owner = parts[0].strip() or None
- repo = _normalize_repo_name(parts[1])
- return owner, repo
-
-
-def detect_provider(repo_url: str) -> str:
- host = urlparse(repo_url).netloc.lower()
- if "github.com" in host:
- return "github"
- if "gitlab" in host:
- return "gitlab"
-
- owner, repo = _split_owner_repo(repo_url)
- if owner and repo:
- return "gitea"
-
- return "generic"
-
-
-async def _safe_json(session, url: str, *, headers: dict | None = None, timeout: int = 20):
- try:
- async with session.get(url, timeout=timeout, headers=headers) as resp:
- status = resp.status
- if status != 200:
- return None, status
- return await resp.json(), status
- except Exception:
- return None, None
-
-
-async def _safe_text(session, url: str, *, headers: dict | None = None, timeout: int = 20):
- try:
- async with session.get(url, timeout=timeout, headers=headers) as resp:
- status = resp.status
- if status != 200:
- return None, status
- return await resp.text(), status
- except Exception:
- return None, None
-
-
-def _extract_tag_from_github_url(url: str) -> str | None:
- m = re.search(r"/releases/tag/([^/?#]+)", url)
- if m:
- return m.group(1)
- m = re.search(r"/tag/([^/?#]+)", url)
- if m:
- return m.group(1)
- return None
-
-
-def _strip_html(s: str) -> str:
- # minimal HTML entity cleanup for meta descriptions
- out = (
- s.replace("&", "&")
- .replace(""", '"')
- .replace("'", "'")
- .replace("<", "<")
- .replace(">", ">")
- )
- return re.sub(r"\s+", " ", out).strip()
-
-
-def _extract_meta(html: str, *, prop: str | None = None, name: str | None = None) -> str | None:
- # Extract
- # or
- if prop:
- # property="..." content="..."
- m = re.search(
- r']+property=["\']' + re.escape(prop) + r'["\'][^>]+content=["\']([^"\']+)["\']',
- html,
- flags=re.IGNORECASE,
- )
- if m:
- return _strip_html(m.group(1))
- m = re.search(
- r']+content=["\']([^"\']+)["\'][^>]+property=["\']' + re.escape(prop) + r'["\']',
- html,
- flags=re.IGNORECASE,
- )
- if m:
- return _strip_html(m.group(1))
-
- if name:
- m = re.search(
- r']+name=["\']' + re.escape(name) + r'["\'][^>]+content=["\']([^"\']+)["\']',
- html,
- flags=re.IGNORECASE,
- )
- if m:
- return _strip_html(m.group(1))
- m = re.search(
- r']+content=["\']([^"\']+)["\'][^>]+name=["\']' + re.escape(name) + r'["\']',
- html,
- flags=re.IGNORECASE,
- )
- if m:
- return _strip_html(m.group(1))
-
- return None
-
-
-async def _github_description_html(hass: HomeAssistant, owner: str, repo: str) -> str | None:
- """
- GitHub API may be rate-limited; fetch public HTML and read meta description.
- """
- session = async_get_clientsession(hass)
- headers = {
- "User-Agent": UA,
- "Accept": "text/html,application/xhtml+xml",
- }
-
- html, status = await _safe_text(session, f"https://github.com/{owner}/{repo}", headers=headers)
- if not html or status != 200:
+def _render_markdown_server_side(md: str) -> str | None:
+ """Render Markdown -> sanitized HTML (server-side)."""
+ text = (md or "").strip()
+ if not text:
return None
- desc = _extract_meta(html, prop="og:description")
- if desc:
- return desc
-
- desc = _extract_meta(html, name="description")
- if desc:
- return desc
-
- return None
-
-
-async def _github_latest_version_atom(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]:
- session = async_get_clientsession(hass)
- headers = {"User-Agent": UA, "Accept": "application/atom+xml,text/xml;q=0.9,*/*;q=0.8"}
-
- xml_text, _ = await _safe_text(session, f"https://github.com/{owner}/{repo}/releases.atom", headers=headers)
- if not xml_text:
- return None, None
+ html: str | None = None
try:
- root = ET.fromstring(xml_text)
- except Exception:
- return None, None
+ import markdown as mdlib # type: ignore
- for entry in root.findall(".//{*}entry"):
- for link in entry.findall(".//{*}link"):
- href = link.attrib.get("href")
- if not href:
- continue
- tag = _extract_tag_from_github_url(href)
- if tag:
- return tag, "atom"
+ html = mdlib.markdown(
+ text,
+ extensions=["fenced_code", "tables", "sane_lists", "toc"],
+ output_format="html5",
+ )
+ except Exception as e:
+ _LOGGER.debug("python-markdown render failed: %s", e)
+ html = None
- return None, None
-
-
-async def _github_latest_version_redirect(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]:
- session = async_get_clientsession(hass)
- headers = {"User-Agent": UA}
- url = f"https://github.com/{owner}/{repo}/releases/latest"
- try:
- async with session.head(url, allow_redirects=False, timeout=15, headers=headers) as resp:
- if resp.status in (301, 302, 303, 307, 308):
- loc = resp.headers.get("Location")
- if loc:
- tag = _extract_tag_from_github_url(loc)
- if tag:
- return tag, "release"
- except Exception:
- pass
- return None, None
-
-
-async def _github_latest_version_api(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]:
- session = async_get_clientsession(hass)
- headers = {"Accept": "application/vnd.github+json", "User-Agent": UA}
-
- data, _ = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}/releases/latest", headers=headers)
- if isinstance(data, dict):
- tag = data.get("tag_name") or data.get("name")
- if isinstance(tag, str) and tag.strip():
- return tag.strip(), "release"
-
- data, _ = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=1", headers=headers)
- if isinstance(data, list) and data:
- tag = data[0].get("name")
- if isinstance(tag, str) and tag.strip():
- return tag.strip(), "tag"
-
- return None, None
-
-
-async def _github_latest_version(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]:
- tag, src = await _github_latest_version_atom(hass, owner, repo)
- if tag:
- return tag, src
-
- tag, src = await _github_latest_version_redirect(hass, owner, repo)
- if tag:
- return tag, src
-
- return await _github_latest_version_api(hass, owner, repo)
-
-
-async def _gitea_latest_version(hass: HomeAssistant, base: str, owner: str, repo: str) -> tuple[str | None, str | None]:
- session = async_get_clientsession(hass)
-
- data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}/releases?limit=1")
- if isinstance(data, list) and data:
- tag = data[0].get("tag_name") or data[0].get("name")
- if isinstance(tag, str) and tag.strip():
- return tag.strip(), "release"
-
- data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}/tags?limit=1")
- if isinstance(data, list) and data:
- tag = data[0].get("name")
- if isinstance(tag, str) and tag.strip():
- return tag.strip(), "tag"
-
- return None, None
-
-
-async def _gitlab_latest_version(hass: HomeAssistant, base: str, owner: str, repo: str) -> tuple[str | None, str | None]:
- session = async_get_clientsession(hass)
- headers = {"User-Agent": UA}
- project = quote_plus(f"{owner}/{repo}")
-
- data, _ = await _safe_json(
- session,
- f"{base}/api/v4/projects/{project}/releases?per_page=1&order_by=released_at&sort=desc",
- headers=headers,
- )
- if isinstance(data, list) and data:
- tag = data[0].get("tag_name") or data[0].get("name")
- if isinstance(tag, str) and tag.strip():
- return tag.strip(), "release"
-
- data, _ = await _safe_json(
- session,
- f"{base}/api/v4/projects/{project}/repository/tags?per_page=1&order_by=updated&sort=desc",
- headers=headers,
- )
- if isinstance(data, list) and data:
- tag = data[0].get("name")
- if isinstance(tag, str) and tag.strip():
- return tag.strip(), "tag"
-
- return None, None
-
-
-async def fetch_repo_info(hass: HomeAssistant, repo_url: str) -> RepoInfo:
- provider = detect_provider(repo_url)
- owner, repo = _split_owner_repo(repo_url)
-
- info = RepoInfo(
- owner=owner,
- repo_name=repo,
- description=None,
- provider=provider,
- default_branch=None,
- latest_version=None,
- latest_version_source=None,
- )
-
- if not owner or not repo:
- return info
-
- session = async_get_clientsession(hass)
+ if not html:
+ return None
try:
- if provider == "github":
- # Try API repo details (may be rate-limited)
- headers = {"Accept": "application/vnd.github+json", "User-Agent": UA}
- data, status = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}", headers=headers)
+ import bleach # type: ignore
- if isinstance(data, dict):
- info.description = data.get("description")
- info.repo_name = _normalize_repo_name(data.get("name")) or repo
- info.default_branch = data.get("default_branch") or "main"
- if isinstance(data.get("owner"), dict) and data["owner"].get("login"):
- info.owner = data["owner"]["login"]
- else:
- # If API blocked, still set reasonable defaults
- if status == 403:
- _LOGGER.debug("GitHub API blocked/rate-limited for repo info %s/%s", owner, repo)
- info.default_branch = "main"
+ allowed_tags = [
+ "p",
+ "br",
+ "hr",
+ "div",
+ "span",
+ "blockquote",
+ "pre",
+ "code",
+ "h1",
+ "h2",
+ "h3",
+ "h4",
+ "h5",
+ "h6",
+ "ul",
+ "ol",
+ "li",
+ "strong",
+ "em",
+ "b",
+ "i",
+ "u",
+ "s",
+ "a",
+ "img",
+ "table",
+ "thead",
+ "tbody",
+ "tr",
+ "th",
+ "td",
+ ]
- # If description missing, fetch from GitHub HTML
- if not info.description:
- desc = await _github_description_html(hass, owner, repo)
- if desc:
- info.description = desc
+ allowed_attrs = {
+ "a": ["href", "title", "target", "rel"],
+ "img": ["src", "alt", "title"],
+ "th": ["align"],
+ "td": ["align"],
+ "*": ["class"],
+ }
- ver, src = await _github_latest_version(hass, owner, repo)
- info.latest_version = ver
- info.latest_version_source = src
- return info
+ sanitized = bleach.clean(
+ html,
+ tags=allowed_tags,
+ attributes=allowed_attrs,
+ protocols=["http", "https", "mailto"],
+ strip=True,
+ )
- if provider == "gitlab":
- u = urlparse(repo_url.rstrip("/"))
- base = f"{u.scheme}://{u.netloc}"
- headers = {"User-Agent": UA}
- project = quote_plus(f"{owner}/{repo}")
-
- data, _ = await _safe_json(session, f"{base}/api/v4/projects/{project}", headers=headers)
- if isinstance(data, dict):
- info.description = data.get("description")
- info.repo_name = _normalize_repo_name(data.get("path")) or repo
- info.default_branch = data.get("default_branch") or "main"
- ns = data.get("namespace")
- if isinstance(ns, dict) and ns.get("path"):
- info.owner = ns.get("path")
-
- ver, src = await _gitlab_latest_version(hass, base, owner, repo)
- info.latest_version = ver
- info.latest_version_source = src
- return info
-
- if provider == "gitea":
- u = urlparse(repo_url.rstrip("/"))
- base = f"{u.scheme}://{u.netloc}"
-
- data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}")
- if isinstance(data, dict):
- info.description = data.get("description")
- info.repo_name = _normalize_repo_name(data.get("name")) or repo
- info.default_branch = data.get("default_branch") or "main"
- if isinstance(data.get("owner"), dict) and data["owner"].get("login"):
- info.owner = data["owner"]["login"]
-
- ver, src = await _gitea_latest_version(hass, base, owner, repo)
- info.latest_version = ver
- info.latest_version_source = src
- return info
-
- return info
+ sanitized = sanitized.replace(
+ ' web.Response:
+ base = Path(__file__).resolve().parent / "panel"
+ base_resolved = base.resolve()
+
+ req_path = (path or "").lstrip("/")
+ if req_path == "":
+ req_path = "index.html"
+
+ target = (base / req_path).resolve()
+
+ if not str(target).startswith(str(base_resolved)):
+ return web.Response(status=404)
+
+ if target.is_dir():
+ target = (target / "index.html").resolve()
+
+ if not target.exists():
+ _LOGGER.error("BCS static asset not found: %s", target)
+ return web.Response(status=404)
+
+ content_type = "text/plain"
+ charset = None
+
+ if target.suffix == ".js":
+ content_type = "application/javascript"
+ charset = "utf-8"
+ elif target.suffix == ".html":
+ content_type = "text/html"
+ charset = "utf-8"
+ elif target.suffix == ".css":
+ content_type = "text/css"
+ charset = "utf-8"
+ elif target.suffix == ".svg":
+ content_type = "image/svg+xml"
+ elif target.suffix == ".png":
+ content_type = "image/png"
+
+ resp = web.Response(body=target.read_bytes(), content_type=content_type, charset=charset)
+ resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
+ resp.headers["Pragma"] = "no-cache"
+ return resp
+
+
+class BCSApiView(HomeAssistantView):
+ url = "/api/bcs"
+ name = "api:bcs"
+ requires_auth = True
+
+ def __init__(self, core: Any) -> None:
+ self.core = core
+
+ async def get(self, request: web.Request) -> web.Response:
+ return web.json_response(
+ {"ok": True, "version": self.core.version, "repos": self.core.list_repos_public()}
+ )
+
+ async def post(self, request: web.Request) -> web.Response:
+ data = await request.json()
+ op = data.get("op")
+
+ if op == "add_custom_repo":
+ url = str(data.get("url") or "").strip()
+ name = data.get("name")
+ name = str(name).strip() if name else None
+ if not url:
+ return web.json_response({"ok": False, "message": "Missing url"}, status=400)
+ repo = await self.core.add_custom_repo(url=url, name=name)
+ return web.json_response({"ok": True, "repo": asdict(repo)})
+
+ return web.json_response({"ok": False, "message": "Unknown operation"}, status=400)
+
+
+class BCSCustomRepoView(HomeAssistantView):
+ url = "/api/bcs/custom_repo"
+ name = "api:bcs_custom_repo"
+ requires_auth = True
+
+ def __init__(self, core: Any) -> None:
+ self.core = core
+
+ async def delete(self, request: web.Request) -> web.Response:
+ repo_id = request.query.get("id")
+ if not repo_id:
+ return web.json_response({"ok": False, "message": "Missing id"}, status=400)
+ await self.core.remove_custom_repo(repo_id)
+ return web.json_response({"ok": True})
+
+
+class BCSReadmeView(HomeAssistantView):
+ url = "/api/bcs/readme"
+ name = "api:bcs_readme"
+ requires_auth = True
+
+ def __init__(self, core: Any) -> None:
+ self.core = core
+
+ async def get(self, request: web.Request) -> web.Response:
+ repo_id = request.query.get("repo_id")
+ if not repo_id:
+ return web.json_response({"ok": False, "message": "Missing repo_id"}, status=400)
+
+ repos = self.core.list_repos_public()
+ repo = next((r for r in repos if str(r.get("id", "")) == str(repo_id)), None)
+ if not repo:
+ return web.json_response({"ok": False, "message": "Repository not found."}, status=404)
+
+ repo_url = repo.get("url")
+ provider = repo.get("provider")
+ default_branch = repo.get("default_branch")
+
+ if not isinstance(repo_url, str) or not repo_url.strip():
+ return web.json_response({"ok": False, "message": "Repository URL missing."}, status=404)
+
+ md = await fetch_readme_markdown_from_repo(
+ self.core.hass,
+ repo_url=repo_url,
+ provider=provider if isinstance(provider, str) else None,
+ default_branch=default_branch if isinstance(default_branch, str) else None,
+ )
+
+ if not md or not md.strip():
+ return web.json_response(
+ {
+ "ok": False,
+ "message": "README not found (raw endpoint returned 404).",
+ },
+ status=404,
+ )
+
+ html = _render_markdown_server_side(md)
+ return web.json_response({"ok": True, "readme": md, "html": html})