0.6.0
This commit is contained in:
@@ -545,6 +545,130 @@ class BCSCore:
|
||||
await self.hass.async_add_executor_job(_restore)
|
||||
_LOGGER.info("BCS rollback applied: domain=%s from=%s", domain, backup_path)
|
||||
|
||||
async def list_repo_backups(self, repo_id: str) -> list[dict[str, Any]]:
|
||||
"""List available backup sets for an installed repository.
|
||||
|
||||
Returns a list of items sorted newest->oldest:
|
||||
{"id": "YYYYMMDD_HHMMSS", "label": "YYYY-MM-DD HH:MM:SS", "complete": bool, "domains": [...] }
|
||||
|
||||
A backup set is considered *complete* if the timestamp exists for all
|
||||
domains of the repository.
|
||||
"""
|
||||
inst = self.get_installed(repo_id) or {}
|
||||
domains = inst.get("domains") or []
|
||||
if not isinstance(domains, list) or not domains:
|
||||
return []
|
||||
|
||||
dom_list = [str(d) for d in domains if str(d).strip()]
|
||||
if not dom_list:
|
||||
return []
|
||||
|
||||
# Collect timestamps per domain.
|
||||
per_domain: dict[str, list[str]] = {}
|
||||
for d in dom_list:
|
||||
per_domain[d] = await self._list_domain_backup_ids(d)
|
||||
|
||||
# Build a map id -> domains where it exists
|
||||
id_map: dict[str, set[str]] = {}
|
||||
for d, ids in per_domain.items():
|
||||
for bid in ids:
|
||||
id_map.setdefault(bid, set()).add(d)
|
||||
|
||||
all_domains = set(dom_list)
|
||||
items: list[dict[str, Any]] = []
|
||||
for bid, present in id_map.items():
|
||||
complete = present == all_domains
|
||||
label = self._format_backup_id(bid)
|
||||
items.append({"id": bid, "label": label, "complete": complete, "domains": sorted(present)})
|
||||
|
||||
# Sort newest first by id (lexicographic works for timestamp format).
|
||||
items.sort(key=lambda x: str(x.get("id") or ""), reverse=True)
|
||||
|
||||
# Keep newest 5 entries overall (UI expects up to 5).
|
||||
return items[: self._backup_keep_per_domain]
|
||||
|
||||
async def restore_repo_backup(self, repo_id: str, backup_id: str) -> dict[str, Any]:
|
||||
"""Restore a previously created backup set for a repository."""
|
||||
repo_id = str(repo_id or "").strip()
|
||||
backup_id = str(backup_id or "").strip()
|
||||
if not repo_id:
|
||||
raise BCSInstallError("Missing repo_id")
|
||||
if not backup_id:
|
||||
raise BCSInstallError("Missing backup_id")
|
||||
|
||||
inst = self.get_installed(repo_id)
|
||||
if not inst:
|
||||
raise BCSInstallError("Repository is not installed")
|
||||
|
||||
domains = inst.get("domains") or []
|
||||
if not isinstance(domains, list) or not domains:
|
||||
raise BCSInstallError("No installed domains found")
|
||||
|
||||
dom_list = [str(d) for d in domains if str(d).strip()]
|
||||
if not dom_list:
|
||||
raise BCSInstallError("No installed domains found")
|
||||
|
||||
# Ensure the backup exists for all domains.
|
||||
missing: list[str] = []
|
||||
for d in dom_list:
|
||||
p = self._backup_root / d / backup_id
|
||||
if not p.exists() or not p.is_dir():
|
||||
missing.append(d)
|
||||
if missing:
|
||||
raise BCSInstallError(f"Selected backup is not available for all domains: missing={missing}")
|
||||
|
||||
async with self._install_lock:
|
||||
_LOGGER.info("BCS restore started: repo_id=%s backup_id=%s domains=%s", repo_id, backup_id, dom_list)
|
||||
|
||||
# Safety: create a new backup of current state before restoring.
|
||||
for d in dom_list:
|
||||
try:
|
||||
await self._backup_domain(d)
|
||||
except Exception:
|
||||
_LOGGER.debug("BCS pre-restore backup failed for domain=%s", d, exc_info=True)
|
||||
|
||||
# Apply restore.
|
||||
for d in dom_list:
|
||||
await self._restore_domain_from_backup(d, self._backup_root / d / backup_id)
|
||||
|
||||
await self._refresh_installed_cache()
|
||||
self._mark_restart_required()
|
||||
self.signal_updated()
|
||||
|
||||
_LOGGER.info("BCS restore complete: repo_id=%s backup_id=%s", repo_id, backup_id)
|
||||
return {"ok": True, "repo_id": repo_id, "backup_id": backup_id, "domains": dom_list, "restart_required": True}
|
||||
|
||||
async def _list_domain_backup_ids(self, domain: str) -> list[str]:
|
||||
"""List backup ids for a domain (newest->oldest)."""
|
||||
domain = str(domain or "").strip()
|
||||
if not domain:
|
||||
return []
|
||||
|
||||
root = self._backup_root / domain
|
||||
|
||||
def _list() -> list[str]:
|
||||
if not root.exists() or not root.is_dir():
|
||||
return []
|
||||
ids = [p.name for p in root.iterdir() if p.is_dir()]
|
||||
ids.sort(reverse=True)
|
||||
return ids
|
||||
|
||||
ids = await self.hass.async_add_executor_job(_list)
|
||||
return ids[: self._backup_keep_per_domain]
|
||||
|
||||
@staticmethod
|
||||
def _format_backup_id(backup_id: str) -> str:
|
||||
"""Format backup id YYYYMMDD_HHMMSS -> YYYY-MM-DD HH:MM:SS."""
|
||||
s = str(backup_id or "").strip()
|
||||
if len(s) != 15 or "_" not in s:
|
||||
return s
|
||||
try:
|
||||
d, t = s.split("_", 1)
|
||||
return f"{d[0:4]}-{d[4:6]}-{d[6:8]} {t[0:2]}:{t[2:4]}:{t[4:6]}"
|
||||
except Exception:
|
||||
return s
|
||||
|
||||
|
||||
async def _copy_domain_dir(self, src_domain_dir: Path, domain: str) -> None:
|
||||
dest_root = Path(self.hass.config.path("custom_components"))
|
||||
target = dest_root / domain
|
||||
|
||||
Reference in New Issue
Block a user