Dateien nach "custom_components/bahmcloud_store" hochladen

This commit is contained in:
2026-01-15 14:08:27 +00:00
parent 1fc274bf7c
commit 6c3cdcde61
5 changed files with 1127 additions and 0 deletions

View File

@@ -0,0 +1,378 @@
from __future__ import annotations
import logging
import re
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from urllib.parse import quote_plus, urlparse
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
_LOGGER = logging.getLogger(__name__)
UA = "BahmcloudStore (Home Assistant)"
@dataclass
class RepoInfo:
owner: str | None = None
repo_name: str | None = None
description: str | None = None
provider: str | None = None
default_branch: str | None = None
latest_version: str | None = None
latest_version_source: str | None = None # "release" | "tag" | "atom" | None
def _normalize_repo_name(name: str | None) -> str | None:
if not name:
return None
n = name.strip()
if n.endswith(".git"):
n = n[:-4]
return n or None
def _split_owner_repo(repo_url: str) -> tuple[str | None, str | None]:
u = urlparse(repo_url.rstrip("/"))
parts = [p for p in u.path.strip("/").split("/") if p]
if len(parts) < 2:
return None, None
owner = parts[0].strip() or None
repo = _normalize_repo_name(parts[1])
return owner, repo
def detect_provider(repo_url: str) -> str:
host = urlparse(repo_url).netloc.lower()
if "github.com" in host:
return "github"
if "gitlab" in host:
return "gitlab"
owner, repo = _split_owner_repo(repo_url)
if owner and repo:
return "gitea"
return "generic"
async def _safe_json(session, url: str, *, headers: dict | None = None, timeout: int = 20):
try:
async with session.get(url, timeout=timeout, headers=headers) as resp:
status = resp.status
if status != 200:
return None, status
return await resp.json(), status
except Exception:
return None, None
async def _safe_text(session, url: str, *, headers: dict | None = None, timeout: int = 20):
try:
async with session.get(url, timeout=timeout, headers=headers) as resp:
status = resp.status
if status != 200:
return None, status
return await resp.text(), status
except Exception:
return None, None
def _extract_tag_from_github_url(url: str) -> str | None:
m = re.search(r"/releases/tag/([^/?#]+)", url)
if m:
return m.group(1)
m = re.search(r"/tag/([^/?#]+)", url)
if m:
return m.group(1)
return None
def _strip_html(s: str) -> str:
# minimal HTML entity cleanup for meta descriptions
out = (
s.replace("&amp;", "&")
.replace("&quot;", '"')
.replace("&#39;", "'")
.replace("&lt;", "<")
.replace("&gt;", ">")
)
return re.sub(r"\s+", " ", out).strip()
def _extract_meta(html: str, *, prop: str | None = None, name: str | None = None) -> str | None:
# Extract <meta property="og:description" content="...">
# or <meta name="description" content="...">
if prop:
# property="..." content="..."
m = re.search(
r'<meta[^>]+property=["\']' + re.escape(prop) + r'["\'][^>]+content=["\']([^"\']+)["\']',
html,
flags=re.IGNORECASE,
)
if m:
return _strip_html(m.group(1))
m = re.search(
r'<meta[^>]+content=["\']([^"\']+)["\'][^>]+property=["\']' + re.escape(prop) + r'["\']',
html,
flags=re.IGNORECASE,
)
if m:
return _strip_html(m.group(1))
if name:
m = re.search(
r'<meta[^>]+name=["\']' + re.escape(name) + r'["\'][^>]+content=["\']([^"\']+)["\']',
html,
flags=re.IGNORECASE,
)
if m:
return _strip_html(m.group(1))
m = re.search(
r'<meta[^>]+content=["\']([^"\']+)["\'][^>]+name=["\']' + re.escape(name) + r'["\']',
html,
flags=re.IGNORECASE,
)
if m:
return _strip_html(m.group(1))
return None
async def _github_description_html(hass: HomeAssistant, owner: str, repo: str) -> str | None:
"""
GitHub API may be rate-limited; fetch public HTML and read meta description.
"""
session = async_get_clientsession(hass)
headers = {
"User-Agent": UA,
"Accept": "text/html,application/xhtml+xml",
}
html, status = await _safe_text(session, f"https://github.com/{owner}/{repo}", headers=headers)
if not html or status != 200:
return None
desc = _extract_meta(html, prop="og:description")
if desc:
return desc
desc = _extract_meta(html, name="description")
if desc:
return desc
return None
async def _github_latest_version_atom(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]:
session = async_get_clientsession(hass)
headers = {"User-Agent": UA, "Accept": "application/atom+xml,text/xml;q=0.9,*/*;q=0.8"}
xml_text, _ = await _safe_text(session, f"https://github.com/{owner}/{repo}/releases.atom", headers=headers)
if not xml_text:
return None, None
try:
root = ET.fromstring(xml_text)
except Exception:
return None, None
for entry in root.findall(".//{*}entry"):
for link in entry.findall(".//{*}link"):
href = link.attrib.get("href")
if not href:
continue
tag = _extract_tag_from_github_url(href)
if tag:
return tag, "atom"
return None, None
async def _github_latest_version_redirect(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]:
session = async_get_clientsession(hass)
headers = {"User-Agent": UA}
url = f"https://github.com/{owner}/{repo}/releases/latest"
try:
async with session.head(url, allow_redirects=False, timeout=15, headers=headers) as resp:
if resp.status in (301, 302, 303, 307, 308):
loc = resp.headers.get("Location")
if loc:
tag = _extract_tag_from_github_url(loc)
if tag:
return tag, "release"
except Exception:
pass
return None, None
async def _github_latest_version_api(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]:
session = async_get_clientsession(hass)
headers = {"Accept": "application/vnd.github+json", "User-Agent": UA}
data, _ = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}/releases/latest", headers=headers)
if isinstance(data, dict):
tag = data.get("tag_name") or data.get("name")
if isinstance(tag, str) and tag.strip():
return tag.strip(), "release"
data, _ = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=1", headers=headers)
if isinstance(data, list) and data:
tag = data[0].get("name")
if isinstance(tag, str) and tag.strip():
return tag.strip(), "tag"
return None, None
async def _github_latest_version(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]:
tag, src = await _github_latest_version_atom(hass, owner, repo)
if tag:
return tag, src
tag, src = await _github_latest_version_redirect(hass, owner, repo)
if tag:
return tag, src
return await _github_latest_version_api(hass, owner, repo)
async def _gitea_latest_version(hass: HomeAssistant, base: str, owner: str, repo: str) -> tuple[str | None, str | None]:
session = async_get_clientsession(hass)
data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}/releases?limit=1")
if isinstance(data, list) and data:
tag = data[0].get("tag_name") or data[0].get("name")
if isinstance(tag, str) and tag.strip():
return tag.strip(), "release"
data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}/tags?limit=1")
if isinstance(data, list) and data:
tag = data[0].get("name")
if isinstance(tag, str) and tag.strip():
return tag.strip(), "tag"
return None, None
async def _gitlab_latest_version(hass: HomeAssistant, base: str, owner: str, repo: str) -> tuple[str | None, str | None]:
session = async_get_clientsession(hass)
headers = {"User-Agent": UA}
project = quote_plus(f"{owner}/{repo}")
data, _ = await _safe_json(
session,
f"{base}/api/v4/projects/{project}/releases?per_page=1&order_by=released_at&sort=desc",
headers=headers,
)
if isinstance(data, list) and data:
tag = data[0].get("tag_name") or data[0].get("name")
if isinstance(tag, str) and tag.strip():
return tag.strip(), "release"
data, _ = await _safe_json(
session,
f"{base}/api/v4/projects/{project}/repository/tags?per_page=1&order_by=updated&sort=desc",
headers=headers,
)
if isinstance(data, list) and data:
tag = data[0].get("name")
if isinstance(tag, str) and tag.strip():
return tag.strip(), "tag"
return None, None
async def fetch_repo_info(hass: HomeAssistant, repo_url: str) -> RepoInfo:
provider = detect_provider(repo_url)
owner, repo = _split_owner_repo(repo_url)
info = RepoInfo(
owner=owner,
repo_name=repo,
description=None,
provider=provider,
default_branch=None,
latest_version=None,
latest_version_source=None,
)
if not owner or not repo:
return info
session = async_get_clientsession(hass)
try:
if provider == "github":
# Try API repo details (may be rate-limited)
headers = {"Accept": "application/vnd.github+json", "User-Agent": UA}
data, status = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}", headers=headers)
if isinstance(data, dict):
info.description = data.get("description")
info.repo_name = _normalize_repo_name(data.get("name")) or repo
info.default_branch = data.get("default_branch") or "main"
if isinstance(data.get("owner"), dict) and data["owner"].get("login"):
info.owner = data["owner"]["login"]
else:
# If API blocked, still set reasonable defaults
if status == 403:
_LOGGER.debug("GitHub API blocked/rate-limited for repo info %s/%s", owner, repo)
info.default_branch = "main"
# If description missing, fetch from GitHub HTML
if not info.description:
desc = await _github_description_html(hass, owner, repo)
if desc:
info.description = desc
ver, src = await _github_latest_version(hass, owner, repo)
info.latest_version = ver
info.latest_version_source = src
return info
if provider == "gitlab":
u = urlparse(repo_url.rstrip("/"))
base = f"{u.scheme}://{u.netloc}"
headers = {"User-Agent": UA}
project = quote_plus(f"{owner}/{repo}")
data, _ = await _safe_json(session, f"{base}/api/v4/projects/{project}", headers=headers)
if isinstance(data, dict):
info.description = data.get("description")
info.repo_name = _normalize_repo_name(data.get("path")) or repo
info.default_branch = data.get("default_branch") or "main"
ns = data.get("namespace")
if isinstance(ns, dict) and ns.get("path"):
info.owner = ns.get("path")
ver, src = await _gitlab_latest_version(hass, base, owner, repo)
info.latest_version = ver
info.latest_version_source = src
return info
if provider == "gitea":
u = urlparse(repo_url.rstrip("/"))
base = f"{u.scheme}://{u.netloc}"
data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}")
if isinstance(data, dict):
info.description = data.get("description")
info.repo_name = _normalize_repo_name(data.get("name")) or repo
info.default_branch = data.get("default_branch") or "main"
if isinstance(data.get("owner"), dict) and data["owner"].get("login"):
info.owner = data["owner"]["login"]
ver, src = await _gitea_latest_version(hass, base, owner, repo)
info.latest_version = ver
info.latest_version_source = src
return info
return info
except Exception as e:
_LOGGER.debug("Provider fetch failed for %s: %s", repo_url, e)
return info

View File

@@ -0,0 +1,78 @@
from __future__ import annotations
import uuid
from dataclasses import dataclass
from typing import Any
from homeassistant.core import HomeAssistant
from homeassistant.helpers.storage import Store
_STORAGE_VERSION = 1
_STORAGE_KEY = "bcs_store"
@dataclass
class CustomRepo:
id: str
url: str
name: str | None = None
class BCSStorage:
"""Persistent storage for manually added repositories."""
def __init__(self, hass: HomeAssistant) -> None:
self.hass = hass
self._store = Store(hass, _STORAGE_VERSION, _STORAGE_KEY)
async def _load(self) -> dict[str, Any]:
data = await self._store.async_load()
if not data:
return {"custom_repos": []}
if "custom_repos" not in data:
data["custom_repos"] = []
return data
async def _save(self, data: dict[str, Any]) -> None:
await self._store.async_save(data)
async def list_custom_repos(self) -> list[CustomRepo]:
data = await self._load()
repos = data.get("custom_repos", [])
out: list[CustomRepo] = []
for r in repos:
if not isinstance(r, dict):
continue
rid = str(r.get("id") or "")
url = str(r.get("url") or "")
name = r.get("name")
if rid and url:
out.append(CustomRepo(id=rid, url=url, name=str(name) if name else None))
return out
async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo:
data = await self._load()
repos = data.get("custom_repos", [])
# Deduplicate by URL
for r in repos:
if isinstance(r, dict) and str(r.get("url", "")).strip() == url.strip():
# Update name if provided
if name:
r["name"] = name
await self._save(data)
return CustomRepo(id=str(r["id"]), url=str(r["url"]), name=r.get("name"))
rid = f"custom:{uuid.uuid4().hex[:10]}"
entry = {"id": rid, "url": url.strip(), "name": name.strip() if name else None}
repos.append(entry)
data["custom_repos"] = repos
await self._save(data)
return CustomRepo(id=rid, url=entry["url"], name=entry["name"])
async def remove_custom_repo(self, repo_id: str) -> None:
data = await self._load()
repos = data.get("custom_repos", [])
data["custom_repos"] = [r for r in repos if not (isinstance(r, dict) and r.get("id") == repo_id)]
await self._save(data)

View File

@@ -0,0 +1,338 @@
from __future__ import annotations
import json
import logging
import shutil
import tempfile
import zipfile
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from aiohttp import web
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.components import persistent_notification
from homeassistant.util import yaml as ha_yaml
from homeassistant.components.http import HomeAssistantView
_LOGGER = logging.getLogger(__name__)
DOMAIN = "bahmcloud_store"
class StoreError(Exception):
"""Store error."""
@dataclass
class StoreConfig:
store_url: str
@dataclass
class Package:
id: str
name: str
type: str # "integration" | "store"
domain: str
repo: str
owner: str
repository: str
branch: str
source_path: str
# computed each refresh
latest_version: str | None = None
zip_url: str | None = None
release_url: str | None = None
class BahmcloudStore:
def __init__(self, hass: HomeAssistant, config: StoreConfig) -> None:
self.hass = hass
self.config = config
self.packages: dict[str, Package] = {}
self.refresh_seconds: int = 300
self._listeners: list[callable] = []
def add_listener(self, cb) -> None:
self._listeners.append(cb)
def signal_entities_updated(self) -> None:
for cb in list(self._listeners):
try:
cb()
except Exception:
pass
@staticmethod
def _zip_url(repo: str, branch: str) -> str:
return f"{repo.rstrip('/')}/archive/{branch}.zip"
@staticmethod
def _base_from_repo(repo_url: str) -> str:
u = urlparse(repo_url.rstrip("/"))
return f"{u.scheme}://{u.netloc}"
@staticmethod
def _raw_manifest_url(repo: str, branch: str, source_path: str) -> str:
# Example:
# https://git.bahmcloud.de/bahmcloud/easy_proxmox/raw/branch/main/custom_components/easy_proxmox/manifest.json
return f"{repo.rstrip('/')}/raw/branch/{branch}/{source_path.rstrip('/')}/manifest.json"
async def _fetch_latest_version(self, pkg: Package) -> tuple[str | None, str | None]:
"""
Returns (latest_version, release_url)
Strategy:
1) releases/latest -> tag_name
2) tags?limit=1 -> first tag name
3) fallback: read manifest.json from repo (version field)
"""
session = async_get_clientsession(self.hass)
base = self._base_from_repo(pkg.repo)
# 1) latest release
latest_release_api = f"{base}/api/v1/repos/{pkg.owner}/{pkg.repository}/releases/latest"
try:
async with session.get(latest_release_api, timeout=20) as resp:
if resp.status == 200:
data = await resp.json()
tag = data.get("tag_name")
html_url = data.get("html_url")
if tag:
return (str(tag), str(html_url) if html_url else None)
except Exception:
pass
# 2) tags fallback
tags_api = f"{base}/api/v1/repos/{pkg.owner}/{pkg.repository}/tags?limit=1"
try:
async with session.get(tags_api, timeout=20) as resp:
if resp.status == 200:
tags = await resp.json()
if tags and isinstance(tags, list):
name = tags[0].get("name")
if name:
return (str(name), None)
except Exception:
pass
# 3) fallback: manifest.json version from repo
try:
manifest_url = self._raw_manifest_url(pkg.repo, pkg.branch, pkg.source_path)
async with session.get(manifest_url, timeout=20) as resp:
if resp.status == 200:
text = await resp.text()
data = json.loads(text)
ver = data.get("version")
if ver:
return (str(ver), None)
except Exception:
pass
return (None, None)
async def refresh(self) -> None:
session = async_get_clientsession(self.hass)
try:
async with session.get(self.config.store_url, timeout=20) as resp:
if resp.status != 200:
raise StoreError(f"store_url returned {resp.status}")
raw = await resp.text()
except Exception as e:
raise StoreError(f"Failed fetching store index: {e}") from e
try:
data = ha_yaml.parse_yaml(raw)
if not isinstance(data, dict):
raise StoreError("store.yaml must be a mapping")
self.refresh_seconds = int(data.get("refresh_seconds", 300))
pkgs = data.get("packages", [])
parsed: dict[str, Package] = {}
for p in pkgs:
pkg = Package(
id=p["id"],
name=p.get("name", p["id"]),
type=p.get("type", "integration"),
domain=p["domain"],
repo=p["repo"],
owner=p["owner"],
repository=p["repository"],
branch=p.get("branch", "main"),
source_path=p["source_path"],
)
pkg.zip_url = self._zip_url(pkg.repo, pkg.branch)
parsed[pkg.id] = pkg
# compute latest versions
for pkg in parsed.values():
latest, rel_url = await self._fetch_latest_version(pkg)
pkg.latest_version = latest or "unknown"
pkg.release_url = rel_url
self.packages = parsed
except Exception as e:
raise StoreError(f"Invalid store.yaml: {e}") from e
def installed_version(self, domain: str) -> str | None:
manifest = Path(self.hass.config.path("custom_components", domain, "manifest.json"))
if not manifest.exists():
return None
try:
data = json.loads(manifest.read_text(encoding="utf-8"))
return str(data.get("version") or "unknown")
except Exception:
return "unknown"
def is_installed(self, domain: str) -> bool:
return Path(self.hass.config.path("custom_components", domain)).exists()
async def install_from_zip(self, pkg: Package) -> None:
"""Manual install/update: download ZIP and copy source_path into /config/custom_components/<domain>."""
if not pkg.zip_url:
raise StoreError("zip_url not set")
session = async_get_clientsession(self.hass)
with tempfile.TemporaryDirectory() as td:
zip_path = Path(td) / "repo.zip"
extract_dir = Path(td) / "extract"
async with session.get(pkg.zip_url, timeout=60) as resp:
if resp.status != 200:
raise StoreError(f"zip_url returned {resp.status}")
zip_path.write_bytes(await resp.read())
await self.hass.async_add_executor_job(self._extract_zip, zip_path, extract_dir)
src = self._find_source_path(extract_dir, pkg.source_path)
if not src:
raise StoreError(f"source_path not found in zip: {pkg.source_path}")
target = Path(self.hass.config.path("custom_components", pkg.domain))
if target.exists():
shutil.rmtree(target)
shutil.copytree(src, target)
# Nach Installation: Entities neu aufbauen (damit es als Update auftaucht)
self.signal_entities_updated()
persistent_notification.async_create(
self.hass,
(
f"**{pkg.name}** wurde installiert/aktualisiert.\n\n"
"Bitte Home Assistant **neu starten**, damit die Änderungen aktiv werden."
),
title="Bahmcloud Store",
notification_id=f"{DOMAIN}_{pkg.domain}_restart_required",
)
@staticmethod
def _extract_zip(zip_path: Path, extract_dir: Path) -> None:
extract_dir.mkdir(parents=True, exist_ok=True)
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(extract_dir)
@staticmethod
def _find_source_path(extract_root: Path, source_path: str) -> Path | None:
direct = extract_root / source_path
if direct.exists():
return direct
for child in extract_root.iterdir():
candidate = child / source_path
if candidate.exists():
return candidate
return None
async def register_http_views(self) -> None:
"""Register HTTP views for static panel assets and JSON API."""
self.hass.http.register_view(_StaticView())
self.hass.http.register_view(_APIView(self))
class _StaticView(HomeAssistantView):
"""
IMPORTANT:
Custom Panel JS modules are loaded WITHOUT Authorization headers.
Therefore static panel assets must be publicly accessible (no auth).
"""
requires_auth = False
name = "bahmcloud_store_static"
url = "/api/bahmcloud_store_static/{path:.*}"
async def get(self, request, path):
base = Path(__file__).resolve().parent / "panel"
if not path:
path = "index.html"
f = (base / path).resolve()
if not str(f).startswith(str(base)) or not f.exists() or not f.is_file():
return web.Response(status=404, text="Not found")
suffix = f.suffix.lower()
if suffix == ".js":
return web.Response(body=f.read_bytes(), content_type="application/javascript")
if suffix == ".css":
return web.Response(body=f.read_bytes(), content_type="text/css")
if suffix in (".html", ".htm"):
return web.Response(body=f.read_bytes(), content_type="text/html")
return web.Response(body=f.read_bytes(), content_type="application/octet-stream")
class _APIView(HomeAssistantView):
"""
Auth-protected API:
GET /api/bahmcloud_store -> list packages
POST /api/bahmcloud_store {op:...} -> install/update a package
"""
requires_auth = True
name = "bahmcloud_store_api"
url = "/api/bahmcloud_store"
def __init__(self, store: BahmcloudStore) -> None:
self.store = store
async def get(self, request):
await self.store.refresh()
items: list[dict[str, Any]] = []
for pkg in self.store.packages.values():
installed = self.store.is_installed(pkg.domain)
items.append(
{
"id": pkg.id,
"name": pkg.name,
"domain": pkg.domain,
"type": pkg.type,
"installed": installed,
"installed_version": self.store.installed_version(pkg.domain) if installed else None,
"latest_version": pkg.latest_version,
"repo": pkg.repo,
"release_url": pkg.release_url,
}
)
return self.json({"packages": items, "store_url": self.store.config.store_url})
async def post(self, request):
data = await request.json()
op = data.get("op")
package_id = data.get("package_id")
if op not in ("install", "update"):
return self.json({"error": "unknown op"}, status_code=400)
if not package_id:
return self.json({"error": "package_id missing"}, status_code=400)
pkg = self.store.packages.get(package_id)
if not pkg:
return self.json({"error": "unknown package_id"}, status_code=404)
await self.store.install_from_zip(pkg)
return self.json({"ok": True})

View File

@@ -0,0 +1,17 @@
from __future__ import annotations
# NOTE:
# Update entities will be implemented once installation/provider resolution is in place.
# This stub prevents platform load errors and keeps the integration stable in 0.3.0.
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
async def async_setup_platform(
hass: HomeAssistant,
config,
async_add_entities: AddEntitiesCallback,
discovery_info=None,
):
return

View File

@@ -0,0 +1,316 @@
from __future__ import annotations
import base64
import logging
from dataclasses import asdict
from pathlib import Path
from typing import Any, TYPE_CHECKING
from aiohttp import web
from homeassistant.components.http import HomeAssistantView
if TYPE_CHECKING:
from .core import BCSCore # typing only
_LOGGER = logging.getLogger(__name__)
def _render_markdown_server_side(md: str) -> str | None:
"""Render Markdown -> sanitized HTML (server-side)."""
text = (md or "").strip()
if not text:
return None
html: str | None = None
# 1) python-markdown
try:
import markdown as mdlib # type: ignore
html = mdlib.markdown(
text,
extensions=["fenced_code", "tables", "sane_lists", "toc"],
output_format="html5",
)
except Exception as e:
_LOGGER.debug("python-markdown render failed: %s", e)
html = None
if not html:
return None
# 2) Sanitize via bleach
try:
import bleach # type: ignore
allowed_tags = [
"p",
"br",
"hr",
"div",
"span",
"blockquote",
"pre",
"code",
"h1",
"h2",
"h3",
"h4",
"h5",
"h6",
"ul",
"ol",
"li",
"strong",
"em",
"b",
"i",
"u",
"s",
"a",
"img",
"table",
"thead",
"tbody",
"tr",
"th",
"td",
]
allowed_attrs = {
"a": ["href", "title", "target", "rel"],
"img": ["src", "alt", "title"],
"th": ["align"],
"td": ["align"],
"*": ["class"],
}
sanitized = bleach.clean(
html,
tags=allowed_tags,
attributes=allowed_attrs,
protocols=["http", "https", "mailto"],
strip=True,
)
sanitized = sanitized.replace(
'<a href="',
'<a rel="noreferrer noopener" target="_blank" href="',
)
return sanitized
except Exception as e:
_LOGGER.debug("bleach sanitize failed/unavailable: %s", e)
return html
_TEXT_KEYS = ("readme", "markdown", "text", "content", "data", "body")
def _maybe_decode_base64(content: str, encoding: Any) -> str | None:
if not isinstance(content, str):
return None
enc = ""
if isinstance(encoding, str):
enc = encoding.strip().lower()
if "base64" not in enc:
return None
try:
raw = base64.b64decode(content.encode("utf-8"), validate=False)
return raw.decode("utf-8", errors="replace")
except Exception:
return None
def _extract_text_recursive(obj: Any, depth: int = 0) -> str | None:
"""
Robust extraction for README markdown.
Handles:
- str / bytes
- dict with:
- {content: "...", encoding: "base64"} (possibly nested)
- {readme: "..."} etc.
- list of dicts (pick first matching)
"""
if obj is None:
return None
if isinstance(obj, bytes):
try:
return obj.decode("utf-8", errors="replace")
except Exception:
return None
if isinstance(obj, str):
return obj
if depth > 8:
return None
if isinstance(obj, dict):
# 1) If it looks like "file content"
content = obj.get("content")
encoding = obj.get("encoding")
# Base64 decode if possible
decoded = _maybe_decode_base64(content, encoding)
if decoded:
return decoded
# content may already be plain text
if isinstance(content, str) and (not isinstance(encoding, str) or not encoding.strip()):
# Heuristic: treat as markdown if it has typical markdown chars, otherwise still return
return content
# 2) direct text keys (readme/markdown/text/body/data)
for k in _TEXT_KEYS:
v = obj.get(k)
if isinstance(v, str):
return v
if isinstance(v, bytes):
try:
return v.decode("utf-8", errors="replace")
except Exception:
pass
# 3) Sometimes nested under "file" / "result" / "payload" etc.
for v in obj.values():
out = _extract_text_recursive(v, depth + 1)
if out:
return out
return None
if isinstance(obj, list):
for item in obj:
out = _extract_text_recursive(item, depth + 1)
if out:
return out
return None
return None
class StaticAssetsView(HomeAssistantView):
url = "/api/bahmcloud_store_static/{path:.*}"
name = "api:bahmcloud_store_static"
requires_auth = False
async def get(self, request: web.Request, path: str) -> web.Response:
base = Path(__file__).resolve().parent / "panel"
base_resolved = base.resolve()
req_path = (path or "").lstrip("/")
if req_path == "":
req_path = "index.html"
target = (base / req_path).resolve()
if not str(target).startswith(str(base_resolved)):
return web.Response(status=404)
if target.is_dir():
target = (target / "index.html").resolve()
if not target.exists():
_LOGGER.error("BCS static asset not found: %s", target)
return web.Response(status=404)
content_type = "text/plain"
charset = None
if target.suffix == ".js":
content_type = "application/javascript"
charset = "utf-8"
elif target.suffix == ".html":
content_type = "text/html"
charset = "utf-8"
elif target.suffix == ".css":
content_type = "text/css"
charset = "utf-8"
elif target.suffix == ".svg":
content_type = "image/svg+xml"
elif target.suffix == ".png":
content_type = "image/png"
resp = web.Response(body=target.read_bytes(), content_type=content_type, charset=charset)
resp.headers["Cache-Control"] = "no-store, no-cache, must-revalidate, max-age=0"
resp.headers["Pragma"] = "no-cache"
return resp
class BCSApiView(HomeAssistantView):
url = "/api/bcs"
name = "api:bcs"
requires_auth = True
def __init__(self, core: Any) -> None:
self.core = core
async def get(self, request: web.Request) -> web.Response:
return web.json_response(
{"ok": True, "version": self.core.version, "repos": self.core.list_repos_public()}
)
async def post(self, request: web.Request) -> web.Response:
data = await request.json()
op = data.get("op")
if op == "add_custom_repo":
url = str(data.get("url") or "").strip()
name = data.get("name")
name = str(name).strip() if name else None
if not url:
return web.json_response({"ok": False, "message": "Missing url"}, status=400)
repo = await self.core.add_custom_repo(url=url, name=name)
return web.json_response({"ok": True, "repo": asdict(repo)})
return web.json_response({"ok": False, "message": "Unknown operation"}, status=400)
class BCSCustomRepoView(HomeAssistantView):
url = "/api/bcs/custom_repo"
name = "api:bcs_custom_repo"
requires_auth = True
def __init__(self, core: Any) -> None:
self.core = core
async def delete(self, request: web.Request) -> web.Response:
repo_id = request.query.get("id")
if not repo_id:
return web.json_response({"ok": False, "message": "Missing id"}, status=400)
await self.core.remove_custom_repo(repo_id)
return web.json_response({"ok": True})
class BCSReadmeView(HomeAssistantView):
url = "/api/bcs/readme"
name = "api:bcs_readme"
requires_auth = True
def __init__(self, core: Any) -> None:
self.core = core
async def get(self, request: web.Request) -> web.Response:
repo_id = request.query.get("repo_id")
if not repo_id:
return web.json_response({"ok": False, "message": "Missing repo_id"}, status=400)
maybe_md = await self.core.fetch_readme_markdown(repo_id)
md = _extract_text_recursive(maybe_md)
if not md or not md.strip():
t = type(maybe_md).__name__
return web.json_response(
{"ok": False, "message": f"README not found or unsupported format (got {t})."},
status=404,
)
# Ensure strict JSON string output (avoid accidental objects)
md_str = str(md)
html = _render_markdown_server_side(md_str)
return web.json_response({"ok": True, "readme": md_str, "html": html})