mirror of
https://github.com/bahmcloud/owncloud-backup-ha.git
synced 2026-04-06 13:31:15 +00:00
343 lines
11 KiB
Python
343 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import tempfile
|
|
from collections.abc import AsyncIterator, Callable, Coroutine
|
|
from dataclasses import asdict, is_dataclass
|
|
from typing import Any
|
|
|
|
from homeassistant.components.backup import (
|
|
AgentBackup,
|
|
BackupAgent,
|
|
BackupAgentError,
|
|
BackupNotFound,
|
|
)
|
|
from homeassistant.core import HomeAssistant, callback
|
|
|
|
from .const import (
|
|
DATA_BACKUP_AGENT_LISTENERS,
|
|
DATA_CLIENT,
|
|
DOMAIN,
|
|
META_SUFFIX,
|
|
SPOOL_FLUSH_BYTES,
|
|
TAR_PREFIX,
|
|
TAR_SUFFIX,
|
|
)
|
|
from .webdav_client import WebDavClient
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
def _make_tar_name(backup_id: str) -> str:
|
|
return f"{TAR_PREFIX}{backup_id}{TAR_SUFFIX}"
|
|
|
|
|
|
def _make_meta_name(backup_id: str) -> str:
|
|
return f"{TAR_PREFIX}{backup_id}{META_SUFFIX}"
|
|
|
|
|
|
def _normalize_backup_dict(d: dict[str, Any]) -> dict[str, Any]:
|
|
"""Normalize backup metadata to satisfy multiple HA schema versions."""
|
|
d = dict(d)
|
|
|
|
# Identity fields (varies by HA versions)
|
|
d.setdefault("backup_id", d.get("slug", ""))
|
|
d.setdefault("slug", d.get("backup_id", ""))
|
|
|
|
# Presentation fields
|
|
d.setdefault("name", f"ownCloud backup ({d.get('backup_id') or d.get('slug') or 'unknown'})")
|
|
d.setdefault("date", d.get("created_at", ""))
|
|
d.setdefault("size", 0)
|
|
d.setdefault("protected", False)
|
|
d.setdefault("compressed", True)
|
|
d.setdefault("extra_metadata", {})
|
|
|
|
# Content selections
|
|
d.setdefault("addons", [])
|
|
d.setdefault("folders", [])
|
|
|
|
# Older schema booleans
|
|
d.setdefault("database", True)
|
|
d.setdefault("homeassistant", True)
|
|
|
|
# Newer schema booleans
|
|
d.setdefault("database_included", d.get("database", True))
|
|
d.setdefault("homeassistant_included", d.get("homeassistant", True))
|
|
d.setdefault("addons_included", bool(d.get("addons", [])))
|
|
d.setdefault("folders_included", bool(d.get("folders", [])))
|
|
|
|
# Keep old keys consistent
|
|
d["database"] = bool(d.get("database_included", True))
|
|
d["homeassistant"] = bool(d.get("homeassistant_included", True))
|
|
|
|
if not isinstance(d.get("addons"), list):
|
|
d["addons"] = []
|
|
if not isinstance(d.get("folders"), list):
|
|
d["folders"] = []
|
|
|
|
return d
|
|
|
|
|
|
def _agentbackup_to_dict(backup: AgentBackup) -> dict[str, Any]:
|
|
"""Serialize AgentBackup in a HA-version-independent way."""
|
|
if hasattr(backup, "to_dict"):
|
|
raw = backup.to_dict() # type: ignore[assignment]
|
|
elif hasattr(backup, "as_dict"):
|
|
raw = backup.as_dict() # type: ignore[assignment]
|
|
elif is_dataclass(backup):
|
|
raw = asdict(backup)
|
|
else:
|
|
raw = {k: v for k, v in vars(backup).items() if not k.startswith("_")}
|
|
|
|
return _normalize_backup_dict(raw)
|
|
|
|
|
|
def _agentbackup_from_dict(d: dict[str, Any]) -> AgentBackup:
|
|
"""Create AgentBackup from dict and ensure required keys exist."""
|
|
d = _normalize_backup_dict(d)
|
|
|
|
from_dict = getattr(AgentBackup, "from_dict", None)
|
|
if callable(from_dict):
|
|
return from_dict(d) # type: ignore[misc]
|
|
return AgentBackup(**d) # type: ignore[arg-type]
|
|
|
|
|
|
async def _spool_stream_to_tempfile(stream: AsyncIterator[bytes]) -> tuple[str, int]:
|
|
"""Spool an async byte stream into a temporary file and return (path, size)."""
|
|
fd, path = tempfile.mkstemp(prefix="owncloud_backup_", suffix=".tar")
|
|
os.close(fd)
|
|
|
|
size = 0
|
|
buf = bytearray()
|
|
|
|
try:
|
|
async for chunk in stream:
|
|
if not chunk:
|
|
continue
|
|
buf.extend(chunk)
|
|
size += len(chunk)
|
|
|
|
if len(buf) >= SPOOL_FLUSH_BYTES:
|
|
data = bytes(buf)
|
|
buf.clear()
|
|
await asyncio.to_thread(_write_bytes_to_file, path, data, append=True)
|
|
|
|
if buf:
|
|
await asyncio.to_thread(_write_bytes_to_file, path, bytes(buf), append=True)
|
|
|
|
return path, size
|
|
|
|
except Exception:
|
|
try:
|
|
os.remove(path)
|
|
except OSError:
|
|
pass
|
|
raise
|
|
|
|
|
|
def _write_bytes_to_file(path: str, data: bytes, *, append: bool) -> None:
|
|
mode = "ab" if append else "wb"
|
|
with open(path, mode) as f:
|
|
f.write(data)
|
|
f.flush()
|
|
|
|
|
|
class OwnCloudBackupAgent(BackupAgent):
|
|
"""Backup agent storing backups in ownCloud via WebDAV."""
|
|
|
|
domain = DOMAIN
|
|
name = "ownCloud (WebDAV)"
|
|
unique_id = "owncloud_webdav_backup_agent_v1"
|
|
|
|
def __init__(self, client: WebDavClient) -> None:
|
|
self._client = client
|
|
|
|
async def async_upload_backup(
|
|
self,
|
|
*,
|
|
open_stream: Callable[[], Coroutine[Any, Any, AsyncIterator[bytes]]],
|
|
backup: AgentBackup,
|
|
**kwargs: Any,
|
|
) -> None:
|
|
"""Upload a backup + metadata sidecar (spooled for non-chunked PUT)."""
|
|
temp_path: str | None = None
|
|
try:
|
|
tar_name = _make_tar_name(backup.backup_id)
|
|
meta_name = _make_meta_name(backup.backup_id)
|
|
|
|
# 1) Spool tar stream to temp file
|
|
stream = await open_stream()
|
|
temp_path, size = await _spool_stream_to_tempfile(stream)
|
|
|
|
# 2) Upload tar file with Content-Length
|
|
await self._client.put_file(tar_name, temp_path, size)
|
|
|
|
# 3) Upload normalized metadata JSON
|
|
meta_dict = _agentbackup_to_dict(backup)
|
|
meta_bytes = json.dumps(meta_dict, ensure_ascii=False).encode("utf-8")
|
|
await self._client.put_bytes(meta_name, meta_bytes)
|
|
|
|
except Exception as err: # noqa: BLE001
|
|
raise BackupAgentError(f"Upload to ownCloud failed: {err}") from err
|
|
finally:
|
|
if temp_path:
|
|
try:
|
|
os.remove(temp_path)
|
|
except OSError:
|
|
pass
|
|
|
|
async def async_list_backups(self, **kwargs: Any) -> list[AgentBackup]:
|
|
"""List backups by reading metadata sidecars; fallback to tar stat if missing."""
|
|
try:
|
|
names = await self._client.listdir()
|
|
|
|
meta_files = {n for n in names if n.startswith(TAR_PREFIX) and n.endswith(META_SUFFIX)}
|
|
tar_files = {n for n in names if n.startswith(TAR_PREFIX) and n.endswith(TAR_SUFFIX)}
|
|
|
|
backups: list[AgentBackup] = []
|
|
|
|
sem = asyncio.Semaphore(5)
|
|
|
|
async def fetch_meta(meta_name: str) -> None:
|
|
async with sem:
|
|
raw = await self._client.get_bytes(meta_name)
|
|
try:
|
|
d = json.loads(raw.decode("utf-8"))
|
|
backups.append(_agentbackup_from_dict(d))
|
|
except Exception as err: # noqa: BLE001
|
|
_LOGGER.warning("Skipping invalid metadata %s: %s", meta_name, err)
|
|
|
|
await asyncio.gather(*(fetch_meta(m) for m in meta_files))
|
|
|
|
known_ids = {b.backup_id for b in backups}
|
|
for tar_name in tar_files:
|
|
backup_id = tar_name.removeprefix(TAR_PREFIX).removesuffix(TAR_SUFFIX)
|
|
if backup_id in known_ids:
|
|
continue
|
|
|
|
info = await self._client.stat(tar_name)
|
|
d = _normalize_backup_dict(
|
|
{
|
|
"backup_id": backup_id,
|
|
"name": f"ownCloud backup ({backup_id})",
|
|
"date": info.get("modified_iso", ""),
|
|
"size": info.get("size", 0),
|
|
# conservative defaults:
|
|
"database_included": True,
|
|
"homeassistant_included": True,
|
|
"addons_included": False,
|
|
"folders_included": False,
|
|
"addons": [],
|
|
"folders": [],
|
|
}
|
|
)
|
|
backups.append(_agentbackup_from_dict(d))
|
|
|
|
backups.sort(key=lambda b: str(b.date), reverse=True)
|
|
return backups
|
|
|
|
except Exception as err: # noqa: BLE001
|
|
raise BackupAgentError(f"Listing backups failed: {err}") from err
|
|
|
|
async def async_get_backup(self, backup_id: str, **kwargs: Any) -> AgentBackup:
|
|
"""Return a single backup's metadata; fallback to tar stat if missing."""
|
|
meta_name = _make_meta_name(backup_id)
|
|
tar_name = _make_tar_name(backup_id)
|
|
|
|
try:
|
|
raw = await self._client.get_bytes(meta_name)
|
|
d = json.loads(raw.decode("utf-8"))
|
|
return _agentbackup_from_dict(d)
|
|
except FileNotFoundError:
|
|
pass
|
|
except Exception as err: # noqa: BLE001
|
|
raise BackupAgentError(f"Get backup metadata failed: {err}") from err
|
|
|
|
try:
|
|
info = await self._client.stat(tar_name)
|
|
d = _normalize_backup_dict(
|
|
{
|
|
"backup_id": backup_id,
|
|
"name": f"ownCloud backup ({backup_id})",
|
|
"date": info.get("modified_iso", ""),
|
|
"size": info.get("size", 0),
|
|
"database_included": True,
|
|
"homeassistant_included": True,
|
|
"addons_included": False,
|
|
"folders_included": False,
|
|
"addons": [],
|
|
"folders": [],
|
|
}
|
|
)
|
|
return _agentbackup_from_dict(d)
|
|
except FileNotFoundError as err:
|
|
raise BackupNotFound(f"Backup not found: {backup_id}") from err
|
|
except Exception as err: # noqa: BLE001
|
|
raise BackupAgentError(f"Get backup failed: {err}") from err
|
|
|
|
async def async_download_backup(self, backup_id: str, **kwargs: Any) -> AsyncIterator[bytes]:
|
|
"""Download tar as async bytes iterator (used for restore)."""
|
|
tar_name = _make_tar_name(backup_id)
|
|
try:
|
|
stream = await self._client.get_stream(tar_name)
|
|
async for chunk in stream:
|
|
yield chunk
|
|
except FileNotFoundError as err:
|
|
raise BackupNotFound(f"Backup not found: {backup_id}") from err
|
|
except Exception as err: # noqa: BLE001
|
|
raise BackupAgentError(f"Download failed: {err}") from err
|
|
|
|
async def async_delete_backup(self, backup_id: str, **kwargs: Any) -> None:
|
|
"""Delete tar + metadata sidecar (best-effort)."""
|
|
tar_name = _make_tar_name(backup_id)
|
|
meta_name = _make_meta_name(backup_id)
|
|
|
|
tar_missing = False
|
|
meta_missing = False
|
|
|
|
try:
|
|
await self._client.delete(tar_name)
|
|
except FileNotFoundError:
|
|
tar_missing = True
|
|
|
|
try:
|
|
await self._client.delete(meta_name)
|
|
except FileNotFoundError:
|
|
meta_missing = True
|
|
|
|
if tar_missing and meta_missing:
|
|
raise BackupNotFound(f"Backup not found: {backup_id}")
|
|
|
|
|
|
async def async_get_backup_agents(hass: HomeAssistant) -> list[BackupAgent]:
|
|
"""Return a list of backup agents."""
|
|
if DOMAIN not in hass.data:
|
|
return []
|
|
|
|
agents: list[BackupAgent] = []
|
|
for entry_data in hass.data[DOMAIN].values():
|
|
client: WebDavClient = entry_data[DATA_CLIENT]
|
|
agents.append(OwnCloudBackupAgent(client))
|
|
|
|
return agents
|
|
|
|
|
|
@callback
|
|
def async_register_backup_agents_listener(
|
|
hass: HomeAssistant,
|
|
*,
|
|
listener: Callable[[], None],
|
|
**kwargs: Any,
|
|
) -> Callable[[], None]:
|
|
"""Register a listener to be called when agents are added or removed."""
|
|
hass.data.setdefault(DATA_BACKUP_AGENT_LISTENERS, []).append(listener)
|
|
|
|
@callback
|
|
def remove_listener() -> None:
|
|
hass.data[DATA_BACKUP_AGENT_LISTENERS].remove(listener)
|
|
|
|
return remove_listener
|