mirror of
https://github.com/bahmcloud/owncloud-backup-ha.git
synced 2026-04-06 13:31:15 +00:00
Implement spooling of byte stream to temp file
Added functionality to spool an async byte stream to a temporary file for improved upload handling.
This commit is contained in:
@@ -3,6 +3,8 @@ from __future__ import annotations
|
|||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
from collections.abc import AsyncIterator, Callable, Coroutine
|
from collections.abc import AsyncIterator, Callable, Coroutine
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
@@ -20,6 +22,7 @@ from .const import (
|
|||||||
DATA_CLIENT,
|
DATA_CLIENT,
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
META_SUFFIX,
|
META_SUFFIX,
|
||||||
|
SPOOL_FLUSH_BYTES,
|
||||||
TAR_PREFIX,
|
TAR_PREFIX,
|
||||||
TAR_SUFFIX,
|
TAR_SUFFIX,
|
||||||
)
|
)
|
||||||
@@ -44,6 +47,50 @@ def _agentbackup_from_dict(d: dict[str, Any]) -> AgentBackup:
|
|||||||
return AgentBackup(**d) # type: ignore[arg-type]
|
return AgentBackup(**d) # type: ignore[arg-type]
|
||||||
|
|
||||||
|
|
||||||
|
async def _spool_stream_to_tempfile(stream: AsyncIterator[bytes]) -> tuple[str, int]:
|
||||||
|
"""Spool an async byte stream into a temporary file and return (path, size).
|
||||||
|
|
||||||
|
This avoids chunked WebDAV uploads and improves compatibility with reverse proxies.
|
||||||
|
"""
|
||||||
|
fd, path = tempfile.mkstemp(prefix="owncloud_backup_", suffix=".tar")
|
||||||
|
os.close(fd)
|
||||||
|
|
||||||
|
size = 0
|
||||||
|
buf = bytearray()
|
||||||
|
|
||||||
|
try:
|
||||||
|
async for chunk in stream:
|
||||||
|
if not chunk:
|
||||||
|
continue
|
||||||
|
buf.extend(chunk)
|
||||||
|
size += len(chunk)
|
||||||
|
|
||||||
|
if len(buf) >= SPOOL_FLUSH_BYTES:
|
||||||
|
data = bytes(buf)
|
||||||
|
buf.clear()
|
||||||
|
await asyncio.to_thread(_write_bytes_to_file, path, data, append=True)
|
||||||
|
|
||||||
|
if buf:
|
||||||
|
await asyncio.to_thread(_write_bytes_to_file, path, bytes(buf), append=True)
|
||||||
|
|
||||||
|
return path, size
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
# Ensure no leftovers on failure
|
||||||
|
try:
|
||||||
|
os.remove(path)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def _write_bytes_to_file(path: str, data: bytes, *, append: bool) -> None:
|
||||||
|
mode = "ab" if append else "wb"
|
||||||
|
with open(path, mode) as f:
|
||||||
|
f.write(data)
|
||||||
|
f.flush()
|
||||||
|
|
||||||
|
|
||||||
class OwnCloudBackupAgent(BackupAgent):
|
class OwnCloudBackupAgent(BackupAgent):
|
||||||
"""Backup agent storing backups in ownCloud via WebDAV."""
|
"""Backup agent storing backups in ownCloud via WebDAV."""
|
||||||
|
|
||||||
@@ -61,21 +108,35 @@ class OwnCloudBackupAgent(BackupAgent):
|
|||||||
backup: AgentBackup,
|
backup: AgentBackup,
|
||||||
**kwargs: Any,
|
**kwargs: Any,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Upload a backup + metadata sidecar."""
|
"""Upload a backup + metadata sidecar.
|
||||||
|
|
||||||
|
To avoid chunked uploads (which often break behind proxies), we spool
|
||||||
|
the stream to a temp file and upload with a Content-Length.
|
||||||
|
"""
|
||||||
|
temp_path: str | None = None
|
||||||
try:
|
try:
|
||||||
tar_name = _make_tar_name(backup.backup_id)
|
tar_name = _make_tar_name(backup.backup_id)
|
||||||
meta_name = _make_meta_name(backup.backup_id)
|
meta_name = _make_meta_name(backup.backup_id)
|
||||||
|
|
||||||
# 1) Upload tar stream
|
# 1) Spool tar stream to temp file
|
||||||
stream = await open_stream()
|
stream = await open_stream()
|
||||||
await self._client.put_stream(tar_name, stream)
|
temp_path, size = await _spool_stream_to_tempfile(stream)
|
||||||
|
|
||||||
# 2) Upload metadata JSON (small)
|
# 2) Upload tar file with Content-Length
|
||||||
|
await self._client.put_file(tar_name, temp_path, size)
|
||||||
|
|
||||||
|
# 3) Upload metadata JSON (small)
|
||||||
meta_bytes = json.dumps(backup.to_dict(), ensure_ascii=False).encode("utf-8")
|
meta_bytes = json.dumps(backup.to_dict(), ensure_ascii=False).encode("utf-8")
|
||||||
await self._client.put_bytes(meta_name, meta_bytes)
|
await self._client.put_bytes(meta_name, meta_bytes)
|
||||||
|
|
||||||
except Exception as err: # noqa: BLE001
|
except Exception as err: # noqa: BLE001
|
||||||
raise BackupAgentError(f"Upload to ownCloud failed: {err}") from err
|
raise BackupAgentError(f"Upload to ownCloud failed: {err}") from err
|
||||||
|
finally:
|
||||||
|
if temp_path:
|
||||||
|
try:
|
||||||
|
os.remove(temp_path)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
async def async_list_backups(self, **kwargs: Any) -> list[AgentBackup]:
|
async def async_list_backups(self, **kwargs: Any) -> list[AgentBackup]:
|
||||||
"""List backups by reading metadata sidecars; fallback to tar stat if missing."""
|
"""List backups by reading metadata sidecars; fallback to tar stat if missing."""
|
||||||
@@ -217,3 +278,4 @@ def async_register_backup_agents_listener(
|
|||||||
hass.data[DATA_BACKUP_AGENT_LISTENERS].remove(listener)
|
hass.data[DATA_BACKUP_AGENT_LISTENERS].remove(listener)
|
||||||
|
|
||||||
return remove_listener
|
return remove_listener
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user