diff --git a/custom_components/bahmcloud_store/core.py b/custom_components/bahmcloud_store/core.py index 87a0d65..1be7c91 100644 --- a/custom_components/bahmcloud_store/core.py +++ b/custom_components/bahmcloud_store/core.py @@ -95,6 +95,21 @@ class BCSCore: self._install_lock = asyncio.Lock() self._installed_cache: dict[str, Any] = {} + # Phase P1/P2: local repo cache + background enrichment + # The cache persists provider/meta/latest data so the UI can show more + # information immediately and we can later do delta refresh. + self._repo_cache: dict[str, Any] = {} + self._repo_cache_loaded: bool = False + self._repo_cache_flush_task: asyncio.Task | None = None + + # Background enrichment worker (non-blocking) + self._bg_enrich_task: asyncio.Task | None = None + self._bg_enrich_pending: set[str] = set() + self._bg_enrich_ttl_seconds: int = 6 * 3600 + self._bg_enrich_max_parallel: int = 3 + self._bg_signal_interval_seconds: float = 2.0 + self._bg_last_signal_ts: float = 0.0 + # Phase F2: backups before install/update self._backup_root = Path(self.hass.config.path(".bcs_backups")) self._backup_keep_per_domain: int = 5 @@ -104,6 +119,15 @@ class BCSCore: self.version = await self._read_manifest_version_async() await self._refresh_installed_cache() + # Load persisted repo cache once at startup. + try: + self._repo_cache = await self.storage.get_repo_cache_map() + if not isinstance(self._repo_cache, dict): + self._repo_cache = {} + except Exception: + self._repo_cache = {} + self._repo_cache_loaded = True + # After a successful HA restart, restart-required is no longer relevant. self._clear_restart_required_issue() @@ -210,6 +234,10 @@ class BCSCore: for r in merged.values(): r.provider = detect_provider(r.url) + # Apply persisted cache (provider/meta/latest) to all repos so the list + # view can show richer data immediately. + self._apply_repo_cache(merged) + await self._enrich_installed_only(merged) self.repos = merged @@ -222,6 +250,159 @@ class BCSCore: len([r for r in self.repos.values() if r.source == "custom"]), ) + # Start/continue background enrichment for repos (non-blocking). + self._schedule_background_enrich(list(self.repos.keys())) + + def _apply_repo_cache(self, merged: dict[str, RepoItem]) -> None: + """Apply persisted cached enrichment data to repo items (no network IO).""" + if not self._repo_cache_loaded or not isinstance(self._repo_cache, dict) or not self._repo_cache: + return + + for rid, r in merged.items(): + entry = self._repo_cache.get(str(rid)) + if not isinstance(entry, dict): + continue + if (entry.get("url") or "").strip() != (r.url or "").strip(): + continue + + # Provider basics + r.provider = entry.get("provider") or r.provider + r.owner = entry.get("owner") or r.owner + r.provider_repo_name = entry.get("provider_repo_name") or r.provider_repo_name + r.provider_description = entry.get("provider_description") or r.provider_description + r.default_branch = entry.get("default_branch") or r.default_branch + + # Latest version + r.latest_version = entry.get("latest_version") or r.latest_version + r.latest_version_source = entry.get("latest_version_source") or r.latest_version_source + + # Metadata + r.meta_source = entry.get("meta_source") or r.meta_source + r.meta_name = entry.get("meta_name") or r.meta_name + r.meta_description = entry.get("meta_description") or r.meta_description + r.meta_category = entry.get("meta_category") or r.meta_category + r.meta_author = entry.get("meta_author") or r.meta_author + r.meta_maintainer = entry.get("meta_maintainer") or r.meta_maintainer + + # Keep a stable name fallback + if r.meta_name: + r.name = r.meta_name + elif not r.name: + r.name = r.provider_repo_name or r.url + + def _schedule_repo_cache_flush(self) -> None: + if self._repo_cache_flush_task and not self._repo_cache_flush_task.done(): + return + + async def _flush_delayed() -> None: + await asyncio.sleep(5) + try: + await self.storage.set_repo_cache_map(self._repo_cache) + except Exception: + _LOGGER.debug("BCS repo cache flush failed", exc_info=True) + + self._repo_cache_flush_task = self.hass.async_create_task(_flush_delayed()) + + def _cache_entry_is_stale(self, entry: dict[str, Any]) -> bool: + try: + checked_at = int(entry.get("checked_at") or 0) + except Exception: + checked_at = 0 + if checked_at <= 0: + return True + return (time.time() - checked_at) > self._bg_enrich_ttl_seconds + + def _schedule_background_enrich(self, repo_ids: list[str]) -> None: + """Queue repos for background enrichment and ensure worker is running.""" + if not repo_ids: + return + + now = time.time() + for rid in repo_ids: + rid = str(rid) + r = self.repos.get(rid) + if not r: + continue + + # Already enriched in memory? Still consider staleness from cache. + entry = self._repo_cache.get(rid) if isinstance(self._repo_cache, dict) else None + stale = True + if isinstance(entry, dict) and (entry.get("url") or "").strip() == (r.url or "").strip(): + stale = self._cache_entry_is_stale(entry) + + # If we already have fields in memory and the cache isn't stale, skip. + if (r.latest_version or r.meta_source or r.provider_description) and not stale: + continue + + # Always enqueue missing/stale entries. + self._bg_enrich_pending.add(rid) + + if self._bg_enrich_task and not self._bg_enrich_task.done(): + return + + self._bg_enrich_task = self.hass.async_create_task(self._background_enrich_worker()) + + async def _background_enrich_worker(self) -> None: + """Background worker to enrich repos and update the persistent cache.""" + sem = asyncio.Semaphore(self._bg_enrich_max_parallel) + + async def _enrich_one(rid: str) -> None: + async with sem: + r = self.repos.get(rid) + if not r: + return + + entry = self._repo_cache.get(rid) if isinstance(self._repo_cache, dict) else None + if isinstance(entry, dict) and (entry.get("url") or "").strip() == (r.url or "").strip(): + if not self._cache_entry_is_stale(entry) and (r.latest_version or r.meta_source or r.provider_description): + return + + try: + await self._enrich_one_repo(r) + except Exception: + _LOGGER.debug("BCS background enrich failed for %s", rid, exc_info=True) + # still mark checked_at to avoid tight retry loops + self._repo_cache[rid] = { + "url": r.url, + "checked_at": int(time.time()), + } + self._schedule_repo_cache_flush() + return + + # Update persistent cache entry + self._repo_cache[rid] = { + "url": r.url, + "provider": r.provider, + "owner": r.owner, + "provider_repo_name": r.provider_repo_name, + "provider_description": r.provider_description, + "default_branch": r.default_branch, + "latest_version": r.latest_version, + "latest_version_source": r.latest_version_source, + "meta_source": r.meta_source, + "meta_name": r.meta_name, + "meta_description": r.meta_description, + "meta_category": r.meta_category, + "meta_author": r.meta_author, + "meta_maintainer": r.meta_maintainer, + "checked_at": int(time.time()), + } + self._schedule_repo_cache_flush() + + # Throttle UI/entity updates + if (time.time() - self._bg_last_signal_ts) >= self._bg_signal_interval_seconds: + self._bg_last_signal_ts = time.time() + self.signal_updated() + + while self._bg_enrich_pending: + # Drain in small batches so we don't monopolize the loop + batch: list[str] = [] + while self._bg_enrich_pending and len(batch) < (self._bg_enrich_max_parallel * 2): + batch.append(self._bg_enrich_pending.pop()) + + await asyncio.gather(*(_enrich_one(rid) for rid in batch), return_exceptions=True) + await asyncio.sleep(0) # yield + async def _enrich_and_resolve(self, merged: dict[str, RepoItem]) -> None: sem = asyncio.Semaphore(6) @@ -312,6 +493,9 @@ class BCSCore: if not r.name: r.name = r.provider_repo_name or r.url + # Persist into local cache (non-blocking, throttled flush). + self._update_repo_cache_from_item(r) + async def ensure_repo_details(self, repo_id: str) -> RepoItem | None: """Ensure provider/meta/latest fields are loaded for a repo. @@ -331,11 +515,99 @@ class BCSCore: _LOGGER.debug("BCS ensure_repo_details failed for %s", repo_id, exc_info=True) return r - async def list_repo_versions(self, repo_id: str) -> list[dict[str, Any]]: - repo = self.get_repo(repo_id) - if not repo: - return [] - return await fetch_repo_versions(self.hass, repo.url) + # --------------------------------------------------------------------- + # Phase P1/P2: local cache + background enrichment + # --------------------------------------------------------------------- + + def _apply_repo_cache(self, merged: dict[str, RepoItem]) -> None: + """Apply persisted cache fields to repo items. + + This makes the list view richer immediately (without remote requests). + """ + cache = self._repo_cache if isinstance(self._repo_cache, dict) else {} + now = int(time.time()) + + for rid, r in merged.items(): + entry = cache.get(str(rid)) + if not isinstance(entry, dict): + continue + + # Safety: ensure cache belongs to the same URL. + if str(entry.get("url") or "").strip() != str(r.url or "").strip(): + continue + + # Provider fields + r.provider = entry.get("provider") or r.provider + r.owner = entry.get("owner") or r.owner + r.provider_repo_name = entry.get("provider_repo_name") or r.provider_repo_name + r.provider_description = entry.get("provider_description") or r.provider_description + r.default_branch = entry.get("default_branch") or r.default_branch + + # Latest version + r.latest_version = entry.get("latest_version") or r.latest_version + r.latest_version_source = entry.get("latest_version_source") or r.latest_version_source + + # Metadata + r.meta_source = entry.get("meta_source") or r.meta_source + r.meta_name = entry.get("meta_name") or r.meta_name + r.meta_description = entry.get("meta_description") or r.meta_description + r.meta_category = entry.get("meta_category") or r.meta_category + r.meta_author = entry.get("meta_author") or r.meta_author + r.meta_maintainer = entry.get("meta_maintainer") or r.meta_maintainer + + # Stable display name + if r.meta_name: + r.name = r.meta_name + elif not r.name: + r.name = r.provider_repo_name or r.url + + # Mark as stale if the cache is old (used by background enrich). + checked_at = int(entry.get("checked_at") or 0) + entry["_stale"] = (checked_at <= 0) or ((now - checked_at) > self._bg_enrich_ttl_seconds) + + def _update_repo_cache_from_item(self, r: RepoItem) -> None: + """Update in-memory cache from a repo item and schedule a flush.""" + if not self._repo_cache_loaded: + return + + rid = str(r.id) + now = int(time.time()) + entry = { + "url": str(r.url or ""), + "provider": r.provider, + "owner": r.owner, + "provider_repo_name": r.provider_repo_name, + "provider_description": r.provider_description, + "default_branch": r.default_branch, + "latest_version": r.latest_version, + "latest_version_source": r.latest_version_source, + "meta_source": r.meta_source, + "meta_name": r.meta_name, + "meta_description": r.meta_description, + "meta_category": r.meta_category, + "meta_author": r.meta_author, + "meta_maintainer": r.meta_maintainer, + "checked_at": now, + } + + if not isinstance(self._repo_cache, dict): + self._repo_cache = {} + self._repo_cache[rid] = entry + self._schedule_repo_cache_flush() + + def _schedule_repo_cache_flush(self) -> None: + if self._repo_cache_flush_task and not self._repo_cache_flush_task.done(): + return + + async def _flush_later() -> None: + try: + await asyncio.sleep(5) + if isinstance(self._repo_cache, dict): + await self.storage.set_repo_cache_map(self._repo_cache) + except Exception: + _LOGGER.debug("BCS repo cache flush failed", exc_info=True) + + self._repo_cache_flush_task = self.hass.async_create_task(_flush_later()) def _add_cache_buster(self, url: str) -> str: parts = urlsplit(url)