Add backup

This commit is contained in:
2026-01-18 07:45:34 +00:00
parent 318d517575
commit a8e247d288

View File

@@ -90,6 +90,10 @@ class BCSCore:
self._install_lock = asyncio.Lock()
self._installed_cache: dict[str, Any] = {}
# Phase F2: backups before install/update
self._backup_root = Path(self.hass.config.path(".bcs_backups"))
self._backup_keep_per_domain: int = 5
async def async_initialize(self) -> None:
"""Async initialization that avoids blocking file IO."""
self.version = await self._read_manifest_version_async()
@@ -482,6 +486,65 @@ class BCSCore:
return candidate
return None
async def _ensure_backup_root(self) -> None:
"""Create backup root directory if needed."""
def _mkdir() -> None:
self._backup_root.mkdir(parents=True, exist_ok=True)
await self.hass.async_add_executor_job(_mkdir)
async def _backup_domain(self, domain: str) -> Path | None:
"""Backup an existing domain folder.
Returns the created backup path, or None if the domain folder does not exist.
"""
dest_root = Path(self.hass.config.path("custom_components"))
target = dest_root / domain
if not target.exists() or not target.is_dir():
return None
await self._ensure_backup_root()
ts = time.strftime("%Y%m%d_%H%M%S")
domain_root = self._backup_root / domain
backup_path = domain_root / ts
def _do_backup() -> None:
domain_root.mkdir(parents=True, exist_ok=True)
if backup_path.exists():
shutil.rmtree(backup_path, ignore_errors=True)
shutil.copytree(target, backup_path, dirs_exist_ok=True)
# Retention: keep only the newest N backups per domain.
try:
backups = [p for p in domain_root.iterdir() if p.is_dir()]
backups.sort(key=lambda p: p.name, reverse=True)
for old in backups[self._backup_keep_per_domain :]:
shutil.rmtree(old, ignore_errors=True)
except Exception:
# Never fail install/update because of retention cleanup.
pass
await self.hass.async_add_executor_job(_do_backup)
_LOGGER.info("BCS backup created: domain=%s path=%s", domain, backup_path)
return backup_path
async def _restore_domain_from_backup(self, domain: str, backup_path: Path) -> None:
"""Restore a domain folder from a backup."""
dest_root = Path(self.hass.config.path("custom_components"))
target = dest_root / domain
def _restore() -> None:
if not backup_path.exists() or not backup_path.is_dir():
return
if target.exists():
shutil.rmtree(target, ignore_errors=True)
shutil.copytree(backup_path, target, dirs_exist_ok=True)
await self.hass.async_add_executor_job(_restore)
_LOGGER.info("BCS rollback applied: domain=%s from=%s", domain, backup_path)
async def _copy_domain_dir(self, src_domain_dir: Path, domain: str) -> None:
dest_root = Path(self.hass.config.path("custom_components"))
target = dest_root / domain
@@ -613,65 +676,110 @@ class BCSCore:
_LOGGER.info("BCS install started: repo_id=%s ref=%s zip_url=%s", repo_id, ref, zip_url)
with tempfile.TemporaryDirectory(prefix="bcs_install_") as td:
tmp = Path(td)
zip_path = tmp / "repo.zip"
extract_dir = tmp / "extract"
extract_dir.mkdir(parents=True, exist_ok=True)
installed_domains: list[str] = []
backups: dict[str, Path] = {}
created_new: set[str] = set()
await self._download_zip(zip_url, zip_path)
await self._extract_zip(zip_path, extract_dir)
try:
with tempfile.TemporaryDirectory(prefix="bcs_install_") as td:
tmp = Path(td)
zip_path = tmp / "repo.zip"
extract_dir = tmp / "extract"
extract_dir.mkdir(parents=True, exist_ok=True)
cc_root = self._find_custom_components_root(extract_dir)
if not cc_root:
raise BCSInstallError("custom_components folder not found in repository ZIP")
await self._download_zip(zip_url, zip_path)
await self._extract_zip(zip_path, extract_dir)
installed_domains: list[str] = []
for domain_dir in cc_root.iterdir():
if not domain_dir.is_dir():
continue
manifest = domain_dir / "manifest.json"
if not manifest.exists():
continue
cc_root = self._find_custom_components_root(extract_dir)
if not cc_root:
raise BCSInstallError("custom_components folder not found in repository ZIP")
domain = domain_dir.name
await self._copy_domain_dir(domain_dir, domain)
installed_domains.append(domain)
dest_root = Path(self.hass.config.path("custom_components"))
if not installed_domains:
raise BCSInstallError("No integrations found under custom_components/ (missing manifest.json)")
for domain_dir in cc_root.iterdir():
if not domain_dir.is_dir():
continue
manifest = domain_dir / "manifest.json"
if not manifest.exists():
continue
installed_manifest_version = await self._read_installed_manifest_version(installed_domains[0])
installed_version = ref
domain = domain_dir.name
target = dest_root / domain
await self.storage.set_installed_repo(
repo_id=repo_id,
url=repo.url,
domains=installed_domains,
installed_version=installed_version,
installed_manifest_version=installed_manifest_version,
ref=ref,
)
await self._refresh_installed_cache()
# Backup only if we are going to overwrite an existing domain.
if target.exists() and target.is_dir():
bkp = await self._backup_domain(domain)
if bkp:
backups[domain] = bkp
else:
created_new.add(domain)
self._mark_restart_required()
await self._copy_domain_dir(domain_dir, domain)
installed_domains.append(domain)
_LOGGER.info(
"BCS install complete: repo_id=%s domains=%s installed_ref=%s manifest_version=%s",
repo_id,
installed_domains,
installed_version,
installed_manifest_version,
)
self.signal_updated()
return {
"ok": True,
"repo_id": repo_id,
"domains": installed_domains,
"installed_version": installed_version,
"installed_manifest_version": installed_manifest_version,
"restart_required": True,
}
if not installed_domains:
raise BCSInstallError("No integrations found under custom_components/ (missing manifest.json)")
installed_manifest_version = await self._read_installed_manifest_version(installed_domains[0])
installed_version = ref
await self.storage.set_installed_repo(
repo_id=repo_id,
url=repo.url,
domains=installed_domains,
installed_version=installed_version,
installed_manifest_version=installed_manifest_version,
ref=ref,
)
await self._refresh_installed_cache()
self._mark_restart_required()
_LOGGER.info(
"BCS install complete: repo_id=%s domains=%s installed_ref=%s manifest_version=%s",
repo_id,
installed_domains,
installed_version,
installed_manifest_version,
)
self.signal_updated()
return {
"ok": True,
"repo_id": repo_id,
"domains": installed_domains,
"installed_version": installed_version,
"installed_manifest_version": installed_manifest_version,
"restart_required": True,
}
except Exception as e:
# Roll back any domains we touched.
_LOGGER.error("BCS install failed, attempting rollback: repo_id=%s error=%s", repo_id, e)
dest_root = Path(self.hass.config.path("custom_components"))
# Restore backed-up domains.
for domain, bkp in backups.items():
try:
await self._restore_domain_from_backup(domain, bkp)
except Exception:
_LOGGER.debug("BCS rollback failed for domain=%s", domain, exc_info=True)
# Remove newly created domains if the install did not complete.
for domain in created_new:
try:
target = dest_root / domain
def _rm() -> None:
if target.exists() and target.is_dir():
shutil.rmtree(target, ignore_errors=True)
await self.hass.async_add_executor_job(_rm)
except Exception:
_LOGGER.debug("BCS cleanup failed for new domain=%s", domain, exc_info=True)
# Re-raise as install error for clean API response.
if isinstance(e, BCSInstallError):
raise
raise BCSInstallError(str(e)) from e
async def update_repo(self, repo_id: str) -> dict[str, Any]:
_LOGGER.info("BCS update started: repo_id=%s", repo_id)