181 lines
5.6 KiB
Python
181 lines
5.6 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
from homeassistant.components.update import UpdateEntity, UpdateEntityFeature
|
|
from homeassistant.core import HomeAssistant, callback
|
|
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
|
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
|
from homeassistant.helpers.entity import EntityCategory
|
|
|
|
from .const import DOMAIN
|
|
from .core import SIGNAL_UPDATED, BCSCore
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
def _get_core(hass: HomeAssistant) -> BCSCore | None:
|
|
data = hass.data.get(DOMAIN)
|
|
if isinstance(data, dict):
|
|
c = data.get("_core")
|
|
return c if isinstance(c, BCSCore) else None
|
|
# Backwards compatibility (older versions used hass.data[DOMAIN] = core)
|
|
return data if isinstance(data, BCSCore) else None
|
|
|
|
|
|
def _pretty_repo_name(core: BCSCore, repo_id: str) -> str:
|
|
"""Return a human-friendly name for a repo update entity."""
|
|
try:
|
|
repo = core.get_repo(repo_id)
|
|
if repo and getattr(repo, "name", None):
|
|
name = str(repo.name).strip()
|
|
if name:
|
|
return name
|
|
except Exception:
|
|
pass
|
|
|
|
if repo_id.startswith("index:"):
|
|
return f"BCS Index {repo_id.split(':', 1)[1]}"
|
|
if repo_id.startswith("custom:"):
|
|
return f"BCS Custom {repo_id.split(':', 1)[1]}"
|
|
return f"BCS {repo_id}"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class _RepoKey:
|
|
repo_id: str
|
|
|
|
|
|
class BCSRepoUpdateEntity(UpdateEntity):
|
|
"""Update entity representing a BCS-managed repository."""
|
|
|
|
_attr_entity_category = EntityCategory.DIAGNOSTIC
|
|
_attr_supported_features = UpdateEntityFeature.INSTALL
|
|
|
|
def __init__(self, core: BCSCore, repo_id: str) -> None:
|
|
self._core = core
|
|
self._repo_id = repo_id
|
|
self._in_progress = False
|
|
|
|
# Stable unique id (do NOT change)
|
|
self._attr_unique_id = f"{DOMAIN}:{repo_id}"
|
|
|
|
self._refresh_display_name()
|
|
|
|
def _refresh_display_name(self) -> None:
|
|
pretty = _pretty_repo_name(self._core, self._repo_id)
|
|
self._attr_name = pretty
|
|
self._attr_title = pretty
|
|
|
|
@property
|
|
def available(self) -> bool:
|
|
repo = self._core.get_repo(self._repo_id)
|
|
installed = self._core.get_installed(self._repo_id)
|
|
return repo is not None and installed is not None
|
|
|
|
@property
|
|
def in_progress(self) -> bool | None:
|
|
return self._in_progress
|
|
|
|
@property
|
|
def installed_version(self) -> str | None:
|
|
installed = self._core.get_installed(self._repo_id) or {}
|
|
v = installed.get("installed_version") or installed.get("ref")
|
|
return str(v) if v else None
|
|
|
|
@property
|
|
def latest_version(self) -> str | None:
|
|
repo = self._core.get_repo(self._repo_id)
|
|
if not repo:
|
|
return None
|
|
v = getattr(repo, "latest_version", None)
|
|
return str(v) if v else None
|
|
|
|
@property
|
|
def update_available(self) -> bool:
|
|
latest = self.latest_version
|
|
installed = self.installed_version
|
|
if not latest or not installed:
|
|
return False
|
|
return latest != installed
|
|
|
|
def version_is_newer(self, latest_version: str, installed_version: str) -> bool:
|
|
return latest_version != installed_version
|
|
|
|
@property
|
|
def release_url(self) -> str | None:
|
|
repo = self._core.get_repo(self._repo_id)
|
|
return getattr(repo, "url", None) if repo else None
|
|
|
|
async def async_install(self, version: str | None, backup: bool, **kwargs: Any) -> None:
|
|
if version is not None:
|
|
_LOGGER.debug("BCS update entity requested specific version=%s (ignored)", version)
|
|
|
|
self._in_progress = True
|
|
self.async_write_ha_state()
|
|
|
|
try:
|
|
await self._core.update_repo(self._repo_id)
|
|
finally:
|
|
self._in_progress = False
|
|
self.async_write_ha_state()
|
|
|
|
|
|
@callback
|
|
def _sync_entities(core: BCSCore, existing: dict[str, BCSRepoUpdateEntity], async_add_entities: AddEntitiesCallback) -> None:
|
|
"""Ensure there is one update entity per installed repo AND keep names in sync."""
|
|
installed_map = getattr(core, "_installed_cache", {}) or {}
|
|
new_entities: list[BCSRepoUpdateEntity] = []
|
|
|
|
for repo_id, data in installed_map.items():
|
|
if not isinstance(data, dict):
|
|
continue
|
|
|
|
if repo_id in existing:
|
|
# IMPORTANT: Update display name after refresh, when repo.name becomes available.
|
|
existing[repo_id]._refresh_display_name()
|
|
continue
|
|
|
|
ent = BCSRepoUpdateEntity(core, repo_id)
|
|
existing[repo_id] = ent
|
|
new_entities.append(ent)
|
|
|
|
if new_entities:
|
|
async_add_entities(new_entities)
|
|
|
|
for ent in existing.values():
|
|
ent.async_write_ha_state()
|
|
|
|
|
|
async def async_setup_platform(
|
|
hass: HomeAssistant,
|
|
config,
|
|
async_add_entities: AddEntitiesCallback,
|
|
discovery_info=None,
|
|
):
|
|
"""Set up BCS update entities."""
|
|
core: BCSCore | None = _get_core(hass)
|
|
if not core:
|
|
_LOGGER.debug("BCS core not available, skipping update platform setup")
|
|
return
|
|
|
|
entities: dict[str, BCSRepoUpdateEntity] = {}
|
|
|
|
_sync_entities(core, entities, async_add_entities)
|
|
|
|
@callback
|
|
def _handle_update() -> None:
|
|
_sync_entities(core, entities, async_add_entities)
|
|
|
|
async_dispatcher_connect(hass, SIGNAL_UPDATED, _handle_update)
|
|
|
|
|
|
async def async_setup_entry(
|
|
hass: HomeAssistant,
|
|
entry,
|
|
async_add_entities: AddEntitiesCallback,
|
|
) -> None:
|
|
"""Set up BCS update entities from a config entry."""
|
|
await async_setup_platform(hass, {}, async_add_entities, None) |