This commit is contained in:
2026-01-18 18:51:48 +00:00
parent 05897d4370
commit e19ca5bff1

View File

@@ -32,6 +32,10 @@ RESTART_REQUIRED_ISSUE_ID = "restart_required"
BACKUP_META_FILENAME = ".bcs_backup_meta.json" BACKUP_META_FILENAME = ".bcs_backup_meta.json"
# Optional HACS integrations index (GitHub repositories only).
HACS_INTEGRATIONS_URL = "https://data-v2.hacs.xyz/integration/repositories.json"
HACS_DEFAULT_CATEGORY = "Integrations"
class BCSError(Exception): class BCSError(Exception):
"""BCS core error.""" """BCS core error."""
@@ -95,20 +99,8 @@ class BCSCore:
self._install_lock = asyncio.Lock() self._install_lock = asyncio.Lock()
self._installed_cache: dict[str, Any] = {} self._installed_cache: dict[str, Any] = {}
# Phase P1/P2: local repo cache + background enrichment # Persistent settings (UI toggles etc.)
# The cache persists provider/meta/latest data so the UI can show more self.settings: dict[str, Any] = {"hacs_enabled": False}
# information immediately and we can later do delta refresh.
self._repo_cache: dict[str, Any] = {}
self._repo_cache_loaded: bool = False
self._repo_cache_flush_task: asyncio.Task | None = None
# Background enrichment worker (non-blocking)
self._bg_enrich_task: asyncio.Task | None = None
self._bg_enrich_pending: set[str] = set()
self._bg_enrich_ttl_seconds: int = 6 * 3600
self._bg_enrich_max_parallel: int = 3
self._bg_signal_interval_seconds: float = 2.0
self._bg_last_signal_ts: float = 0.0
# Phase F2: backups before install/update # Phase F2: backups before install/update
self._backup_root = Path(self.hass.config.path(".bcs_backups")) self._backup_root = Path(self.hass.config.path(".bcs_backups"))
@@ -119,14 +111,13 @@ class BCSCore:
self.version = await self._read_manifest_version_async() self.version = await self._read_manifest_version_async()
await self._refresh_installed_cache() await self._refresh_installed_cache()
# Load persisted repo cache once at startup. # Load persistent settings (do not fail startup)
try: try:
self._repo_cache = await self.storage.get_repo_cache_map() s = await self.storage.get_settings()
if not isinstance(self._repo_cache, dict): if isinstance(s, dict):
self._repo_cache = {} self.settings.update(s)
except Exception: except Exception:
self._repo_cache = {} pass
self._repo_cache_loaded = True
# After a successful HA restart, restart-required is no longer relevant. # After a successful HA restart, restart-required is no longer relevant.
self._clear_restart_required_issue() self._clear_restart_required_issue()
@@ -199,19 +190,48 @@ class BCSCore:
data = (self._installed_cache or {}).get(repo_id) data = (self._installed_cache or {}).get(repo_id)
return data if isinstance(data, dict) else None return data if isinstance(data, dict) else None
def get_settings_public(self) -> dict[str, Any]:
"""Return UI-relevant settings (no I/O)."""
return {
"hacs_enabled": bool(self.settings.get("hacs_enabled", False)),
}
async def set_settings(self, updates: dict[str, Any]) -> dict[str, Any]:
"""Persist settings and apply them."""
safe_updates: dict[str, Any] = {}
if "hacs_enabled" in (updates or {}):
safe_updates["hacs_enabled"] = bool(updates.get("hacs_enabled"))
merged = await self.storage.set_settings(safe_updates)
if isinstance(merged, dict):
self.settings.update(merged)
# Reload repo list after changing settings.
await self.full_refresh(source="settings")
return self.get_settings_public()
async def refresh(self) -> None: async def refresh(self) -> None:
index_repos, refresh_seconds = await self._load_index_repos() index_repos, refresh_seconds = await self._load_index_repos()
self.refresh_seconds = refresh_seconds self.refresh_seconds = refresh_seconds
hacs_enabled = bool(self.settings.get("hacs_enabled", False))
hacs_repos: list[RepoItem] = []
if hacs_enabled:
try:
hacs_repos = await self._load_hacs_repos()
except Exception as e:
_LOGGER.warning("BCS HACS index load failed: %s", e)
custom_repos = await self.storage.list_custom_repos() custom_repos = await self.storage.list_custom_repos()
# Fast path: if index + custom repos did not change, skip expensive work. # Fast path: if index + custom repos did not change, skip expensive work.
try: try:
custom_sig = [(c.id, (c.url or '').strip(), (c.name or '').strip()) for c in (custom_repos or [])] custom_sig = [(c.id, (c.url or '').strip(), (c.name or '').strip()) for c in (custom_repos or [])]
custom_sig.sort() custom_sig.sort()
refresh_signature = json.dumps({"index_hash": self.last_index_hash, "custom": custom_sig}, sort_keys=True) hacs_sig = len(hacs_repos) if hacs_enabled else 0
refresh_signature = json.dumps({"index_hash": self.last_index_hash, "custom": custom_sig, "hacs": hacs_sig, "hacs_enabled": hacs_enabled}, sort_keys=True)
except Exception: except Exception:
refresh_signature = f"{self.last_index_hash}:{len(custom_repos or [])}" refresh_signature = f"{self.last_index_hash}:{len(custom_repos or [])}:{'h' if hacs_enabled else 'n'}:{len(hacs_repos)}"
if self._last_refresh_signature and refresh_signature == self._last_refresh_signature and self.repos: if self._last_refresh_signature and refresh_signature == self._last_refresh_signature and self.repos:
_LOGGER.debug("BCS refresh skipped (no changes detected)") _LOGGER.debug("BCS refresh skipped (no changes detected)")
@@ -223,6 +243,9 @@ class BCSCore:
for item in index_repos: for item in index_repos:
merged[item.id] = item merged[item.id] = item
for item in hacs_repos:
merged[item.id] = item
for c in custom_repos: for c in custom_repos:
merged[c.id] = RepoItem( merged[c.id] = RepoItem(
id=c.id, id=c.id,
@@ -234,174 +257,57 @@ class BCSCore:
for r in merged.values(): for r in merged.values():
r.provider = detect_provider(r.url) r.provider = detect_provider(r.url)
# Apply persisted cache (provider/meta/latest) to all repos so the list
# view can show richer data immediately.
self._apply_repo_cache(merged)
await self._enrich_installed_only(merged) await self._enrich_installed_only(merged)
self.repos = merged self.repos = merged
self._last_refresh_signature = refresh_signature self._last_refresh_signature = refresh_signature
_LOGGER.info( _LOGGER.info(
"BCS refresh complete: repos=%s (index=%s, custom=%s)", "BCS refresh complete: repos=%s (index=%s, hacs=%s, custom=%s)",
len(self.repos), len(self.repos),
len([r for r in self.repos.values() if r.source == "index"]), len([r for r in self.repos.values() if r.source == "index"]),
len([r for r in self.repos.values() if r.source == "hacs"]),
len([r for r in self.repos.values() if r.source == "custom"]), len([r for r in self.repos.values() if r.source == "custom"]),
) )
# Start/continue background enrichment for repos (non-blocking). async def _load_hacs_repos(self) -> list[RepoItem]:
self._schedule_background_enrich(list(self.repos.keys())) """Load the official HACS integrations repository list.
def _apply_repo_cache(self, merged: dict[str, RepoItem]) -> None: This is used as an optional additional source to keep the local store index small.
"""Apply persisted cached enrichment data to repo items (no network IO).""" We only parse owner/repo strings and map them to GitHub URLs.
if not self._repo_cache_loaded or not isinstance(self._repo_cache, dict) or not self._repo_cache: """
return session = async_get_clientsession(self.hass)
headers = {
"User-Agent": "BahmcloudStore (Home Assistant)",
"Cache-Control": "no-cache, no-store, max-age=0",
"Pragma": "no-cache",
}
async with session.get(HACS_INTEGRATIONS_URL, timeout=60, headers=headers) as resp:
if resp.status != 200:
raise BCSError(f"HACS index returned {resp.status}")
data = await resp.json()
for rid, r in merged.items(): if not isinstance(data, list):
entry = self._repo_cache.get(str(rid)) raise BCSError("HACS repositories.json must be a list")
if not isinstance(entry, dict):
items: list[RepoItem] = []
for entry in data:
if not isinstance(entry, str):
continue continue
if (entry.get("url") or "").strip() != (r.url or "").strip(): full_name = entry.strip().strip("/")
if not full_name or "/" not in full_name:
continue continue
repo_id = f"hacs:{full_name.lower()}"
# Provider basics items.append(
r.provider = entry.get("provider") or r.provider RepoItem(
r.owner = entry.get("owner") or r.owner id=repo_id,
r.provider_repo_name = entry.get("provider_repo_name") or r.provider_repo_name name=full_name,
r.provider_description = entry.get("provider_description") or r.provider_description url=f"https://github.com/{full_name}",
r.default_branch = entry.get("default_branch") or r.default_branch source="hacs",
meta_category=HACS_DEFAULT_CATEGORY,
# Latest version )
r.latest_version = entry.get("latest_version") or r.latest_version )
r.latest_version_source = entry.get("latest_version_source") or r.latest_version_source return items
# Metadata
r.meta_source = entry.get("meta_source") or r.meta_source
r.meta_name = entry.get("meta_name") or r.meta_name
r.meta_description = entry.get("meta_description") or r.meta_description
r.meta_category = entry.get("meta_category") or r.meta_category
r.meta_author = entry.get("meta_author") or r.meta_author
r.meta_maintainer = entry.get("meta_maintainer") or r.meta_maintainer
# Keep a stable name fallback
if r.meta_name:
r.name = r.meta_name
elif not r.name:
r.name = r.provider_repo_name or r.url
def _schedule_repo_cache_flush(self) -> None:
if self._repo_cache_flush_task and not self._repo_cache_flush_task.done():
return
async def _flush_delayed() -> None:
await asyncio.sleep(5)
try:
await self.storage.set_repo_cache_map(self._repo_cache)
except Exception:
_LOGGER.debug("BCS repo cache flush failed", exc_info=True)
self._repo_cache_flush_task = self.hass.async_create_task(_flush_delayed())
def _cache_entry_is_stale(self, entry: dict[str, Any]) -> bool:
try:
checked_at = int(entry.get("checked_at") or 0)
except Exception:
checked_at = 0
if checked_at <= 0:
return True
return (time.time() - checked_at) > self._bg_enrich_ttl_seconds
def _schedule_background_enrich(self, repo_ids: list[str]) -> None:
"""Queue repos for background enrichment and ensure worker is running."""
if not repo_ids:
return
now = time.time()
for rid in repo_ids:
rid = str(rid)
r = self.repos.get(rid)
if not r:
continue
# Already enriched in memory? Still consider staleness from cache.
entry = self._repo_cache.get(rid) if isinstance(self._repo_cache, dict) else None
stale = True
if isinstance(entry, dict) and (entry.get("url") or "").strip() == (r.url or "").strip():
stale = self._cache_entry_is_stale(entry)
# If we already have fields in memory and the cache isn't stale, skip.
if (r.latest_version or r.meta_source or r.provider_description) and not stale:
continue
# Always enqueue missing/stale entries.
self._bg_enrich_pending.add(rid)
if self._bg_enrich_task and not self._bg_enrich_task.done():
return
self._bg_enrich_task = self.hass.async_create_task(self._background_enrich_worker())
async def _background_enrich_worker(self) -> None:
"""Background worker to enrich repos and update the persistent cache."""
sem = asyncio.Semaphore(self._bg_enrich_max_parallel)
async def _enrich_one(rid: str) -> None:
async with sem:
r = self.repos.get(rid)
if not r:
return
entry = self._repo_cache.get(rid) if isinstance(self._repo_cache, dict) else None
if isinstance(entry, dict) and (entry.get("url") or "").strip() == (r.url or "").strip():
if not self._cache_entry_is_stale(entry) and (r.latest_version or r.meta_source or r.provider_description):
return
try:
await self._enrich_one_repo(r)
except Exception:
_LOGGER.debug("BCS background enrich failed for %s", rid, exc_info=True)
# still mark checked_at to avoid tight retry loops
self._repo_cache[rid] = {
"url": r.url,
"checked_at": int(time.time()),
}
self._schedule_repo_cache_flush()
return
# Update persistent cache entry
self._repo_cache[rid] = {
"url": r.url,
"provider": r.provider,
"owner": r.owner,
"provider_repo_name": r.provider_repo_name,
"provider_description": r.provider_description,
"default_branch": r.default_branch,
"latest_version": r.latest_version,
"latest_version_source": r.latest_version_source,
"meta_source": r.meta_source,
"meta_name": r.meta_name,
"meta_description": r.meta_description,
"meta_category": r.meta_category,
"meta_author": r.meta_author,
"meta_maintainer": r.meta_maintainer,
"checked_at": int(time.time()),
}
self._schedule_repo_cache_flush()
# Throttle UI/entity updates
if (time.time() - self._bg_last_signal_ts) >= self._bg_signal_interval_seconds:
self._bg_last_signal_ts = time.time()
self.signal_updated()
while self._bg_enrich_pending:
# Drain in small batches so we don't monopolize the loop
batch: list[str] = []
while self._bg_enrich_pending and len(batch) < (self._bg_enrich_max_parallel * 2):
batch.append(self._bg_enrich_pending.pop())
await asyncio.gather(*(_enrich_one(rid) for rid in batch), return_exceptions=True)
await asyncio.sleep(0) # yield
async def _enrich_and_resolve(self, merged: dict[str, RepoItem]) -> None: async def _enrich_and_resolve(self, merged: dict[str, RepoItem]) -> None:
sem = asyncio.Semaphore(6) sem = asyncio.Semaphore(6)
@@ -493,9 +399,6 @@ class BCSCore:
if not r.name: if not r.name:
r.name = r.provider_repo_name or r.url r.name = r.provider_repo_name or r.url
# Persist into local cache (non-blocking, throttled flush).
self._update_repo_cache_from_item(r)
async def ensure_repo_details(self, repo_id: str) -> RepoItem | None: async def ensure_repo_details(self, repo_id: str) -> RepoItem | None:
"""Ensure provider/meta/latest fields are loaded for a repo. """Ensure provider/meta/latest fields are loaded for a repo.
@@ -515,99 +418,11 @@ class BCSCore:
_LOGGER.debug("BCS ensure_repo_details failed for %s", repo_id, exc_info=True) _LOGGER.debug("BCS ensure_repo_details failed for %s", repo_id, exc_info=True)
return r return r
# --------------------------------------------------------------------- async def list_repo_versions(self, repo_id: str) -> list[dict[str, Any]]:
# Phase P1/P2: local cache + background enrichment repo = self.get_repo(repo_id)
# --------------------------------------------------------------------- if not repo:
return []
def _apply_repo_cache(self, merged: dict[str, RepoItem]) -> None: return await fetch_repo_versions(self.hass, repo.url)
"""Apply persisted cache fields to repo items.
This makes the list view richer immediately (without remote requests).
"""
cache = self._repo_cache if isinstance(self._repo_cache, dict) else {}
now = int(time.time())
for rid, r in merged.items():
entry = cache.get(str(rid))
if not isinstance(entry, dict):
continue
# Safety: ensure cache belongs to the same URL.
if str(entry.get("url") or "").strip() != str(r.url or "").strip():
continue
# Provider fields
r.provider = entry.get("provider") or r.provider
r.owner = entry.get("owner") or r.owner
r.provider_repo_name = entry.get("provider_repo_name") or r.provider_repo_name
r.provider_description = entry.get("provider_description") or r.provider_description
r.default_branch = entry.get("default_branch") or r.default_branch
# Latest version
r.latest_version = entry.get("latest_version") or r.latest_version
r.latest_version_source = entry.get("latest_version_source") or r.latest_version_source
# Metadata
r.meta_source = entry.get("meta_source") or r.meta_source
r.meta_name = entry.get("meta_name") or r.meta_name
r.meta_description = entry.get("meta_description") or r.meta_description
r.meta_category = entry.get("meta_category") or r.meta_category
r.meta_author = entry.get("meta_author") or r.meta_author
r.meta_maintainer = entry.get("meta_maintainer") or r.meta_maintainer
# Stable display name
if r.meta_name:
r.name = r.meta_name
elif not r.name:
r.name = r.provider_repo_name or r.url
# Mark as stale if the cache is old (used by background enrich).
checked_at = int(entry.get("checked_at") or 0)
entry["_stale"] = (checked_at <= 0) or ((now - checked_at) > self._bg_enrich_ttl_seconds)
def _update_repo_cache_from_item(self, r: RepoItem) -> None:
"""Update in-memory cache from a repo item and schedule a flush."""
if not self._repo_cache_loaded:
return
rid = str(r.id)
now = int(time.time())
entry = {
"url": str(r.url or ""),
"provider": r.provider,
"owner": r.owner,
"provider_repo_name": r.provider_repo_name,
"provider_description": r.provider_description,
"default_branch": r.default_branch,
"latest_version": r.latest_version,
"latest_version_source": r.latest_version_source,
"meta_source": r.meta_source,
"meta_name": r.meta_name,
"meta_description": r.meta_description,
"meta_category": r.meta_category,
"meta_author": r.meta_author,
"meta_maintainer": r.meta_maintainer,
"checked_at": now,
}
if not isinstance(self._repo_cache, dict):
self._repo_cache = {}
self._repo_cache[rid] = entry
self._schedule_repo_cache_flush()
def _schedule_repo_cache_flush(self) -> None:
if self._repo_cache_flush_task and not self._repo_cache_flush_task.done():
return
async def _flush_later() -> None:
try:
await asyncio.sleep(5)
if isinstance(self._repo_cache, dict):
await self.storage.set_repo_cache_map(self._repo_cache)
except Exception:
_LOGGER.debug("BCS repo cache flush failed", exc_info=True)
self._repo_cache_flush_task = self.hass.async_create_task(_flush_later())
def _add_cache_buster(self, url: str) -> str: def _add_cache_buster(self, url: str) -> str:
parts = urlsplit(url) parts = urlsplit(url)