This commit is contained in:
2026-01-16 19:10:35 +00:00
parent 5796012189
commit c04612e159

View File

@@ -1,583 +1,166 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import hashlib
import json
import logging import logging
import time
import shutil
import tempfile
import zipfile
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path
from typing import Any from typing import Any
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit, urlparse
from homeassistant.core import HomeAssistant from homeassistant.components.update import UpdateEntity, UpdateEntityFeature
from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.core import HomeAssistant, callback
from homeassistant.components import persistent_notification from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.util import yaml as ha_yaml from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.entity import EntityCategory
from .storage import BCSStorage, CustomRepo from .core import DOMAIN, SIGNAL_UPDATED, BCSCore
from .providers import fetch_repo_info, detect_provider, RepoInfo, fetch_readme_markdown
from .metadata import fetch_repo_metadata, RepoMetadata
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
DOMAIN = "bahmcloud_store"
def _pretty_repo_name(core: BCSCore, repo_id: str) -> str:
class BCSError(Exception): """Return a human-friendly name for a repo update entity."""
"""BCS core error."""
class BCSInstallError(BCSError):
"""BCS installation/update error."""
@dataclass
class BCSConfig:
store_url: str
@dataclass
class RepoItem:
id: str
name: str
url: str
source: str # "index" | "custom"
owner: str | None = None
provider: str | None = None
provider_repo_name: str | None = None
provider_description: str | None = None
default_branch: str | None = None
latest_version: str | None = None
latest_version_source: str | None = None # "release" | "tag" | "atom" | None
meta_source: str | None = None
meta_name: str | None = None
meta_description: str | None = None
meta_category: str | None = None
meta_author: str | None = None
meta_maintainer: str | None = None
class BCSCore:
def __init__(self, hass: HomeAssistant, config: BCSConfig) -> None:
self.hass = hass
self.config = config
self.storage = BCSStorage(hass)
self.refresh_seconds: int = 300
self.repos: dict[str, RepoItem] = {}
self._listeners: list[callable] = []
# Will be loaded asynchronously (no blocking IO in event loop)
self.version: str = "unknown"
# Diagnostics (helps verify refresh behavior)
self.last_index_url: str | None = None
self.last_index_bytes: int | None = None
self.last_index_hash: str | None = None
self.last_index_loaded_at: float | None = None
self._install_lock = asyncio.Lock()
self._installed_cache: dict[str, Any] = {}
async def async_initialize(self) -> None:
"""Async initialization that avoids blocking file IO."""
self.version = await self._read_manifest_version_async()
await self._refresh_installed_cache()
async def _read_manifest_version_async(self) -> str:
def _read() -> str:
try: try:
manifest_path = Path(__file__).resolve().parent / "manifest.json" repo = core.get_repo(repo_id)
data = json.loads(manifest_path.read_text(encoding="utf-8")) if repo and getattr(repo, "name", None):
v = data.get("version") name = str(repo.name).strip()
return str(v) if v else "unknown" if name:
except Exception: return name
return "unknown"
return await self.hass.async_add_executor_job(_read)
def add_listener(self, cb) -> None:
self._listeners.append(cb)
def signal_updated(self) -> None:
for cb in list(self._listeners):
try:
cb()
except Exception: except Exception:
pass pass
async def full_refresh(self, source: str = "manual") -> None: # Fallbacks
"""Single refresh entry-point used by both timer and manual button.""" if repo_id.startswith("index:"):
_LOGGER.info("BCS full refresh triggered (source=%s)", source) return f"BCS Index {repo_id.split(':', 1)[1]}"
await self.refresh() if repo_id.startswith("custom:"):
self.signal_updated() return f"BCS Custom {repo_id.split(':', 1)[1]}"
return f"BCS {repo_id}"
def get_repo(self, repo_id: str) -> RepoItem | None:
return self.repos.get(repo_id)
async def refresh(self) -> None: @dataclass(frozen=True)
index_repos, refresh_seconds = await self._load_index_repos() class _RepoKey:
self.refresh_seconds = refresh_seconds repo_id: str
custom_repos = await self.storage.list_custom_repos()
merged: dict[str, RepoItem] = {} class BCSRepoUpdateEntity(UpdateEntity):
"""Update entity representing a BCS-managed repository."""
for item in index_repos: _attr_entity_category = EntityCategory.DIAGNOSTIC
merged[item.id] = item _attr_supported_features = UpdateEntityFeature.INSTALL
for c in custom_repos: def __init__(self, core: BCSCore, repo_id: str) -> None:
merged[c.id] = RepoItem( self._core = core
id=c.id, self._repo_id = repo_id
name=(c.name or c.url), self._in_progress = False
url=c.url,
source="custom",
)
for r in merged.values(): # Stable unique id (do NOT change)
r.provider = detect_provider(r.url) self._attr_unique_id = f"{DOMAIN}:{repo_id}"
await self._enrich_and_resolve(merged) # Human-friendly name in UI
self.repos = merged pretty = _pretty_repo_name(core, repo_id)
self._attr_name = pretty
_LOGGER.info( # Title shown in the entity dialog
"BCS refresh complete: repos=%s (index=%s, custom=%s)", self._attr_title = pretty
len(self.repos),
len([r for r in self.repos.values() if r.source == "index"]),
len([r for r in self.repos.values() if r.source == "custom"]),
)
async def _enrich_and_resolve(self, merged: dict[str, RepoItem]) -> None: @property
sem = asyncio.Semaphore(6) def available(self) -> bool:
repo = self._core.get_repo(self._repo_id)
installed = self._core.get_installed(self._repo_id)
return repo is not None and installed is not None
async def process_one(r: RepoItem) -> None: @property
async with sem: def in_progress(self) -> bool | None:
info: RepoInfo = await fetch_repo_info(self.hass, r.url) return self._in_progress
r.provider = info.provider or r.provider @property
r.owner = info.owner or r.owner def installed_version(self) -> str | None:
r.provider_repo_name = info.repo_name installed = self._core.get_installed(self._repo_id) or {}
r.provider_description = info.description v = installed.get("installed_version") or installed.get("ref")
r.default_branch = info.default_branch or r.default_branch
r.latest_version = info.latest_version
r.latest_version_source = info.latest_version_source
md: RepoMetadata = await fetch_repo_metadata(self.hass, r.url, r.default_branch)
r.meta_source = md.source
r.meta_name = md.name
r.meta_description = md.description
r.meta_category = md.category
r.meta_author = md.author
r.meta_maintainer = md.maintainer
has_user_or_index_name = bool(r.name) and (r.name != r.url) and (not str(r.name).startswith("http"))
if r.meta_name:
r.name = r.meta_name
elif not has_user_or_index_name and r.provider_repo_name:
r.name = r.provider_repo_name
elif not r.name:
r.name = r.url
await asyncio.gather(*(process_one(r) for r in merged.values()), return_exceptions=True)
def _add_cache_buster(self, url: str) -> str:
parts = urlsplit(url)
q = dict(parse_qsl(parts.query, keep_blank_values=True))
q["t"] = str(int(time.time()))
new_query = urlencode(q)
return urlunsplit((parts.scheme, parts.netloc, parts.path, new_query, parts.fragment))
def _gitea_src_to_raw(self, url: str) -> str:
parts = urlsplit(url)
path = parts.path
path2 = path.replace("/src/branch/", "/raw/branch/")
if path2 == path:
return url
return urlunsplit((parts.scheme, parts.netloc, path2, parts.query, parts.fragment))
async def _fetch_store_text(self, url: str) -> str:
session = async_get_clientsession(self.hass)
headers = {
"User-Agent": "BahmcloudStore (Home Assistant)",
"Cache-Control": "no-cache, no-store, max-age=0",
"Pragma": "no-cache",
"Expires": "0",
}
async with session.get(url, timeout=30, headers=headers) as resp:
if resp.status != 200:
raise BCSError(f"store_url returned {resp.status}")
return await resp.text()
async def _load_index_repos(self) -> tuple[list[RepoItem], int]:
store_url = (self.config.store_url or "").strip()
if not store_url:
raise BCSError("store_url is empty")
url = self._add_cache_buster(store_url)
try:
raw = await self._fetch_store_text(url)
# If we fetched a HTML page (wrong endpoint), attempt raw conversion.
if "<html" in raw.lower() or "<!doctype html" in raw.lower():
fallback = self._add_cache_buster(self._gitea_src_to_raw(store_url))
if fallback != url:
_LOGGER.warning("BCS store index looked like HTML, retrying raw URL")
raw = await self._fetch_store_text(fallback)
url = fallback
except Exception as e:
raise BCSError(f"Failed fetching store index: {e}") from e
# Diagnostics
b = raw.encode("utf-8", errors="replace")
h = hashlib.sha256(b).hexdigest()[:12]
self.last_index_url = url
self.last_index_bytes = len(b)
self.last_index_hash = h
self.last_index_loaded_at = time.time()
_LOGGER.info(
"BCS index loaded: url=%s bytes=%s sha=%s",
self.last_index_url,
self.last_index_bytes,
self.last_index_hash,
)
try:
data = ha_yaml.parse_yaml(raw)
if not isinstance(data, dict):
raise BCSError("store.yaml must be a mapping")
refresh_seconds = int(data.get("refresh_seconds", 300))
repos = data.get("repos", [])
if not isinstance(repos, list):
raise BCSError("store.yaml 'repos' must be a list")
items: list[RepoItem] = []
for i, r in enumerate(repos):
if not isinstance(r, dict):
continue
repo_url = str(r.get("url", "")).strip()
if not repo_url:
continue
name = str(r.get("name") or repo_url).strip()
items.append(
RepoItem(
id=f"index:{i}",
name=name,
url=repo_url,
source="index",
)
)
_LOGGER.info("BCS index parsed: repos=%s refresh_seconds=%s", len(items), refresh_seconds)
return items, refresh_seconds
except Exception as e:
raise BCSError(f"Invalid store.yaml: {e}") from e
async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo:
url = str(url or "").strip()
if not url:
raise BCSError("Missing url")
c = await self.storage.add_custom_repo(url, name)
await self.full_refresh(source="custom_repo_add")
return c
async def remove_custom_repo(self, repo_id: str) -> None:
await self.storage.remove_custom_repo(repo_id)
await self.full_refresh(source="custom_repo_remove")
async def list_custom_repos(self) -> list[CustomRepo]:
return await self.storage.list_custom_repos()
def list_repos_public(self) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
installed_map: dict[str, Any] = getattr(self, "_installed_cache", {}) or {}
if not isinstance(installed_map, dict):
installed_map = {}
for r in self.repos.values():
inst = installed_map.get(r.id)
installed = bool(inst)
installed_domains: list[str] = []
installed_version: str | None = None
installed_manifest_version: str | None = None
if isinstance(inst, dict):
d = inst.get("domains") or []
if isinstance(d, list):
installed_domains = [str(x) for x in d if str(x).strip()]
# IMPORTANT: this is the ref we installed (tag/release/branch)
v = inst.get("installed_version")
installed_version = str(v) if v is not None else None
mv = inst.get("installed_manifest_version")
installed_manifest_version = str(mv) if mv is not None else None
out.append(
{
"id": r.id,
"name": r.name,
"url": r.url,
"source": r.source,
"owner": r.owner,
"provider": r.provider,
"repo_name": r.provider_repo_name,
"description": r.provider_description or r.meta_description,
"default_branch": r.default_branch,
"latest_version": r.latest_version,
"latest_version_source": r.latest_version_source,
"category": r.meta_category,
"meta_author": r.meta_author,
"meta_maintainer": r.meta_maintainer,
"meta_source": r.meta_source,
"installed": installed,
"installed_version": installed_version,
"installed_manifest_version": installed_manifest_version,
"installed_domains": installed_domains,
}
)
return out
async def fetch_readme_markdown(self, repo_id: str) -> str | None:
repo = self.get_repo(repo_id)
if not repo:
return None
return await fetch_readme_markdown(
self.hass,
repo.url,
provider=repo.provider,
default_branch=repo.default_branch,
)
def _pick_ref_for_install(self, repo: RepoItem) -> str:
# Prefer latest_version (release/tag/atom-derived), fallback to default branch, then main.
if repo.latest_version and str(repo.latest_version).strip():
return str(repo.latest_version).strip()
if repo.default_branch and str(repo.default_branch).strip():
return str(repo.default_branch).strip()
return "main"
def _build_zip_url(self, repo_url: str, ref: str) -> str:
"""Build a public ZIP download URL (provider-neutral, no tokens).
Supports:
- GitHub: codeload
- GitLab: /-/archive/
- Gitea (incl. Bahmcloud): /archive/<ref>.zip
"""
ref = (ref or "").strip()
if not ref:
raise BCSInstallError("Missing ref for ZIP download")
u = urlparse(repo_url.rstrip("/"))
host = (u.netloc or "").lower()
parts = [p for p in u.path.strip("/").split("/") if p]
if len(parts) < 2:
raise BCSInstallError("Invalid repository URL (missing owner/repo)")
owner = parts[0]
repo = parts[1]
if repo.endswith(".git"):
repo = repo[:-4]
if "github.com" in host:
return f"https://codeload.github.com/{owner}/{repo}/zip/{ref}"
if "gitlab" in host:
base = f"{u.scheme}://{u.netloc}"
path = u.path.strip("/")
if path.endswith(".git"):
path = path[:-4]
return f"{base}/{path}/-/archive/{ref}/{repo}-{ref}.zip"
base = f"{u.scheme}://{u.netloc}"
path = u.path.strip("/")
if path.endswith(".git"):
path = path[:-4]
return f"{base}/{path}/archive/{ref}.zip"
async def _download_zip(self, url: str, dest: Path) -> None:
session = async_get_clientsession(self.hass)
headers = {
"User-Agent": "BahmcloudStore (Home Assistant)",
"Cache-Control": "no-cache, no-store, max-age=0",
"Pragma": "no-cache",
}
async with session.get(url, timeout=120, headers=headers) as resp:
if resp.status != 200:
raise BCSInstallError(f"zip_url returned {resp.status}")
data = await resp.read()
await self.hass.async_add_executor_job(dest.write_bytes, data)
async def _extract_zip(self, zip_path: Path, extract_dir: Path) -> None:
def _extract() -> None:
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(extract_dir)
await self.hass.async_add_executor_job(_extract)
@staticmethod
def _find_custom_components_root(extract_root: Path) -> Path | None:
direct = extract_root / "custom_components"
if direct.exists() and direct.is_dir():
return direct
for child in extract_root.iterdir():
candidate = child / "custom_components"
if candidate.exists() and candidate.is_dir():
return candidate
return None
async def _copy_domain_dir(self, src_domain_dir: Path, domain: str) -> None:
dest_root = Path(self.hass.config.path("custom_components"))
target = dest_root / domain
tmp_target = dest_root / f".bcs_tmp_{domain}_{int(time.time())}"
def _copy() -> None:
if tmp_target.exists():
shutil.rmtree(tmp_target, ignore_errors=True)
shutil.copytree(src_domain_dir, tmp_target, dirs_exist_ok=True)
if target.exists():
shutil.rmtree(target, ignore_errors=True)
tmp_target.rename(target)
await self.hass.async_add_executor_job(_copy)
async def _read_installed_manifest_version(self, domain: str) -> str | None:
def _read() -> str | None:
try:
p = Path(self.hass.config.path("custom_components", domain, "manifest.json"))
if not p.exists():
return None
data = json.loads(p.read_text(encoding="utf-8"))
v = data.get("version")
return str(v) if v else None return str(v) if v else None
except Exception:
return None
return await self.hass.async_add_executor_job(_read) @property
def latest_version(self) -> str | None:
async def _refresh_installed_cache(self) -> None: repo = self._core.get_repo(self._repo_id)
try:
items = await self.storage.list_installed_repos()
cache: dict[str, Any] = {}
for it in items:
cache[it.repo_id] = {
"domains": it.domains,
"installed_version": it.installed_version, # BCS ref
"installed_manifest_version": it.installed_manifest_version,
"ref": it.ref,
"installed_at": it.installed_at,
}
self._installed_cache = cache
except Exception:
self._installed_cache = {}
async def install_repo(self, repo_id: str) -> dict[str, Any]:
repo = self.get_repo(repo_id)
if not repo: if not repo:
raise BCSInstallError(f"repo_id not found: {repo_id}") return None
v = getattr(repo, "latest_version", None)
return str(v) if v else None
async with self._install_lock: @property
ref = self._pick_ref_for_install(repo) def update_available(self) -> bool:
zip_url = self._build_zip_url(repo.url, ref) latest = self.latest_version
installed = self.installed_version
if not latest or not installed:
return False
return latest != installed
_LOGGER.info("BCS install started: repo_id=%s ref=%s zip_url=%s", repo_id, ref, zip_url) def version_is_newer(self, latest_version: str, installed_version: str) -> bool:
return latest_version != installed_version
with tempfile.TemporaryDirectory(prefix="bcs_install_") as td: @property
tmp = Path(td) def release_url(self) -> str | None:
zip_path = tmp / "repo.zip" repo = self._core.get_repo(self._repo_id)
extract_dir = tmp / "extract" return getattr(repo, "url", None) if repo else None
extract_dir.mkdir(parents=True, exist_ok=True)
await self._download_zip(zip_url, zip_path) async def async_install(self, version: str | None, backup: bool, **kwargs: Any) -> None:
await self._extract_zip(zip_path, extract_dir) if version is not None:
_LOGGER.debug(
"BCS update entity requested specific version=%s (ignored)", version
)
cc_root = self._find_custom_components_root(extract_dir) self._in_progress = True
if not cc_root: self.async_write_ha_state()
raise BCSInstallError("custom_components folder not found in repository ZIP")
installed_domains: list[str] = [] try:
for domain_dir in cc_root.iterdir(): await self._core.update_repo(self._repo_id)
if not domain_dir.is_dir(): finally:
self._in_progress = False
self.async_write_ha_state()
@callback
def _sync_entities(
core: BCSCore,
existing: dict[str, BCSRepoUpdateEntity],
async_add_entities: AddEntitiesCallback,
) -> None:
"""Ensure there is one update entity per installed repo."""
installed_map = getattr(core, "_installed_cache", {}) or {}
new_entities: list[BCSRepoUpdateEntity] = []
for repo_id, data in installed_map.items():
if not isinstance(data, dict):
continue continue
manifest = domain_dir / "manifest.json" if repo_id in existing:
if not manifest.exists():
continue continue
domain = domain_dir.name ent = BCSRepoUpdateEntity(core, repo_id)
await self._copy_domain_dir(domain_dir, domain) existing[repo_id] = ent
installed_domains.append(domain) new_entities.append(ent)
if not installed_domains: if new_entities:
raise BCSInstallError("No integrations found under custom_components/ (missing manifest.json)") async_add_entities(new_entities)
# informational only (many repos are wrong here) for ent in existing.values():
installed_manifest_version = await self._read_installed_manifest_version(installed_domains[0]) ent.async_write_ha_state()
# IMPORTANT: BCS "installed_version" is the ref we installed (tag/release/branch),
# so update logic won't break when manifest.json is 0.0.0 or outdated.
installed_version = ref
await self.storage.set_installed_repo( async def async_setup_platform(
repo_id=repo_id, hass: HomeAssistant,
url=repo.url, config,
domains=installed_domains, async_add_entities: AddEntitiesCallback,
installed_version=installed_version, discovery_info=None,
installed_manifest_version=installed_manifest_version, ):
ref=ref, """Set up BCS update entities."""
) core: BCSCore | None = hass.data.get(DOMAIN)
await self._refresh_installed_cache() if not core:
_LOGGER.debug("BCS core not available, skipping update platform setup")
return
persistent_notification.async_create( entities: dict[str, BCSRepoUpdateEntity] = {}
self.hass,
"Bahmcloud Store installation finished. A Home Assistant restart is required to load the integration.",
title="Bahmcloud Store",
notification_id="bcs_restart_required",
)
_LOGGER.info( _sync_entities(core, entities, async_add_entities)
"BCS install complete: repo_id=%s domains=%s installed_ref=%s manifest_version=%s",
repo_id,
installed_domains,
installed_version,
installed_manifest_version,
)
self.signal_updated()
return {
"ok": True,
"repo_id": repo_id,
"domains": installed_domains,
"installed_version": installed_version,
"installed_manifest_version": installed_manifest_version,
"restart_required": True,
}
async def update_repo(self, repo_id: str) -> dict[str, Any]: @callback
_LOGGER.info("BCS update started: repo_id=%s", repo_id) def _handle_update() -> None:
return await self.install_repo(repo_id) _sync_entities(core, entities, async_add_entities)
async def request_restart(self) -> None: async_dispatcher_connect(hass, SIGNAL_UPDATED, _handle_update)
await self.hass.services.async_call("homeassistant", "restart", {}, blocking=False)