Fix blueprint update backups
This commit is contained in:
2
.idea/changes.md
generated
2
.idea/changes.md
generated
@@ -19,6 +19,8 @@
|
||||
- Started the blueprint/category-aware installer preparation: installation metadata now distinguishes install type and installed paths, repo payloads expose install targets, and the active panel shows install-target context for future non-integration categories.
|
||||
- Added initial blueprint install-path handling groundwork so the codebase is no longer fully hard-wired to `custom_components/`.
|
||||
- Bumped the integration version from `0.7.4` to `0.7.5` and added the `0.7.5` release entry to `CHANGELOG.md` for blueprint support and the documentation refresh.
|
||||
- Fixed blueprint update backups: blueprint file updates now create content backups before overwrite and can roll back copied blueprint files if installation fails.
|
||||
- Kept the no-restart behavior for blueprints, because blueprint deployment does not normally require a Home Assistant restart.
|
||||
|
||||
### Documented
|
||||
- Captured the verified project identity from the repository and README files: Bahmcloud Store is a Home Assistant custom integration intended to behave like a provider-neutral store for custom integrations, similar to HACS but broader than GitHub-only workflows.
|
||||
|
||||
@@ -1335,6 +1335,11 @@ class BCSCore:
|
||||
|
||||
await self.hass.async_add_executor_job(_mkdir)
|
||||
|
||||
@staticmethod
|
||||
def _backup_repo_key(repo_id: str) -> str:
|
||||
safe = "".join(ch if ch.isalnum() or ch in ("-", "_") else "_" for ch in str(repo_id or "").strip())
|
||||
return safe or "repo"
|
||||
|
||||
def _build_backup_meta(self, repo_id: str, domain: str) -> dict[str, object]:
|
||||
"""Build metadata for backup folders so restores can recover the stored version."""
|
||||
inst = self.get_installed(repo_id) or {}
|
||||
@@ -1391,6 +1396,68 @@ class BCSCore:
|
||||
_LOGGER.info("BCS backup created: domain=%s path=%s", domain, backup_path)
|
||||
return backup_path
|
||||
|
||||
async def _backup_paths(
|
||||
self,
|
||||
repo_id: str,
|
||||
relative_paths: list[str],
|
||||
*,
|
||||
meta: dict[str, object] | None = None,
|
||||
) -> Path | None:
|
||||
"""Backup arbitrary files under the Home Assistant config root."""
|
||||
|
||||
clean_paths = [str(p).strip().replace("\\", "/") for p in (relative_paths or []) if str(p).strip()]
|
||||
if not clean_paths:
|
||||
return None
|
||||
|
||||
cfg_root = Path(self.hass.config.path(""))
|
||||
existing: list[str] = []
|
||||
for rel in clean_paths:
|
||||
target = cfg_root / rel
|
||||
if target.exists():
|
||||
existing.append(rel)
|
||||
if not existing:
|
||||
return None
|
||||
|
||||
await self._ensure_backup_root()
|
||||
|
||||
ts = time.strftime("%Y%m%d_%H%M%S")
|
||||
repo_root = self._backup_root / "_content" / self._backup_repo_key(repo_id)
|
||||
backup_path = repo_root / ts
|
||||
|
||||
def _do_backup() -> None:
|
||||
repo_root.mkdir(parents=True, exist_ok=True)
|
||||
if backup_path.exists():
|
||||
shutil.rmtree(backup_path, ignore_errors=True)
|
||||
backup_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for rel in existing:
|
||||
src = cfg_root / rel
|
||||
dest = backup_path / rel
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
if src.is_file():
|
||||
shutil.copy2(src, dest)
|
||||
elif src.is_dir():
|
||||
shutil.copytree(src, dest, dirs_exist_ok=True)
|
||||
|
||||
if meta:
|
||||
try:
|
||||
meta_path = backup_path / BACKUP_META_FILENAME
|
||||
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
backups = [p for p in repo_root.iterdir() if p.is_dir()]
|
||||
backups.sort(key=lambda p: p.name, reverse=True)
|
||||
for old in backups[self._backup_keep_per_domain :]:
|
||||
shutil.rmtree(old, ignore_errors=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
await self.hass.async_add_executor_job(_do_backup)
|
||||
_LOGGER.info("BCS content backup created: repo_id=%s path=%s files=%s", repo_id, backup_path, len(existing))
|
||||
return backup_path
|
||||
|
||||
async def _restore_domain_from_backup(self, domain: str, backup_path: Path) -> None:
|
||||
"""Restore a domain folder from a backup."""
|
||||
dest_root = Path(self.hass.config.path("custom_components"))
|
||||
@@ -1428,6 +1495,37 @@ class BCSCore:
|
||||
await self.hass.async_add_executor_job(_restore)
|
||||
_LOGGER.info("BCS rollback applied: domain=%s from=%s", domain, backup_path)
|
||||
|
||||
async def _restore_paths_from_backup(self, backup_path: Path, *, remove_targets: list[str] | None = None) -> None:
|
||||
"""Restore arbitrary backed up files under the Home Assistant config root."""
|
||||
|
||||
cfg_root = Path(self.hass.config.path(""))
|
||||
remove_list = [str(p).strip().replace("\\", "/") for p in (remove_targets or []) if str(p).strip()]
|
||||
|
||||
def _restore() -> None:
|
||||
if not backup_path.exists() or not backup_path.is_dir():
|
||||
return
|
||||
|
||||
for rel in remove_list:
|
||||
target = cfg_root / rel
|
||||
if target.exists():
|
||||
if target.is_dir():
|
||||
shutil.rmtree(target, ignore_errors=True)
|
||||
elif target.is_file():
|
||||
target.unlink(missing_ok=True)
|
||||
|
||||
for src in backup_path.rglob("*"):
|
||||
if not src.is_file():
|
||||
continue
|
||||
if src.name == BACKUP_META_FILENAME:
|
||||
continue
|
||||
rel = src.relative_to(backup_path)
|
||||
dest = cfg_root / rel
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
shutil.copy2(src, dest)
|
||||
|
||||
await self.hass.async_add_executor_job(_restore)
|
||||
_LOGGER.info("BCS content rollback applied: from=%s", backup_path)
|
||||
|
||||
async def list_repo_backups(self, repo_id: str) -> list[dict[str, Any]]:
|
||||
"""List available backup sets for an installed repository.
|
||||
|
||||
@@ -1806,6 +1904,7 @@ class BCSCore:
|
||||
installed_domains: list[str] = []
|
||||
installed_paths: list[str] = []
|
||||
backups: dict[str, Path] = {}
|
||||
content_backup: Path | None = None
|
||||
install_type = self._repo_install_type(repo)
|
||||
|
||||
inst_before = self.get_installed(repo_id) or {}
|
||||
@@ -1832,8 +1931,22 @@ class BCSCore:
|
||||
if not blueprints_root:
|
||||
raise BCSInstallError("blueprints folder not found in repository ZIP")
|
||||
|
||||
config_root = Path(self.hass.config.path(""))
|
||||
target_root = Path(self.hass.config.path("blueprints"))
|
||||
planned_paths: list[str] = []
|
||||
|
||||
for src in blueprints_root.rglob("*"):
|
||||
if not src.is_file():
|
||||
continue
|
||||
rel = src.relative_to(blueprints_root)
|
||||
planned_paths.append(str(Path("blueprints") / rel).replace("\\", "/"))
|
||||
|
||||
if not planned_paths:
|
||||
raise BCSInstallError("No blueprint files found under blueprints/")
|
||||
|
||||
m = dict(backup_meta)
|
||||
m["install_type"] = "blueprint"
|
||||
m["installed_paths"] = planned_paths
|
||||
content_backup = await self._backup_paths(repo_id, planned_paths, meta=m)
|
||||
|
||||
def _copy_blueprints() -> list[str]:
|
||||
copied: list[str] = []
|
||||
@@ -1849,8 +1962,6 @@ class BCSCore:
|
||||
return copied
|
||||
|
||||
installed_paths = await self.hass.async_add_executor_job(_copy_blueprints)
|
||||
if not installed_paths:
|
||||
raise BCSInstallError("No blueprint files found under blueprints/")
|
||||
else:
|
||||
cc_root = self._find_custom_components_root(extract_dir)
|
||||
if not cc_root:
|
||||
@@ -1934,6 +2045,12 @@ class BCSCore:
|
||||
except Exception:
|
||||
_LOGGER.debug("BCS rollback failed for domain=%s", domain, exc_info=True)
|
||||
|
||||
if content_backup is not None:
|
||||
try:
|
||||
await self._restore_paths_from_backup(content_backup, remove_targets=installed_paths)
|
||||
except Exception:
|
||||
_LOGGER.debug("BCS content rollback failed for repo_id=%s", repo_id, exc_info=True)
|
||||
|
||||
# Remove newly created domains if the install did not complete.
|
||||
for domain in created_new:
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user