custom_components/bahmcloud_store/storage.py gelöscht
This commit is contained in:
@@ -1,78 +0,0 @@
|
|||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import uuid
|
|
||||||
from dataclasses import dataclass
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from homeassistant.core import HomeAssistant
|
|
||||||
from homeassistant.helpers.storage import Store
|
|
||||||
|
|
||||||
_STORAGE_VERSION = 1
|
|
||||||
_STORAGE_KEY = "bcs_store"
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class CustomRepo:
|
|
||||||
id: str
|
|
||||||
url: str
|
|
||||||
name: str | None = None
|
|
||||||
|
|
||||||
|
|
||||||
class BCSStorage:
|
|
||||||
"""Persistent storage for manually added repositories."""
|
|
||||||
|
|
||||||
def __init__(self, hass: HomeAssistant) -> None:
|
|
||||||
self.hass = hass
|
|
||||||
self._store = Store(hass, _STORAGE_VERSION, _STORAGE_KEY)
|
|
||||||
|
|
||||||
async def _load(self) -> dict[str, Any]:
|
|
||||||
data = await self._store.async_load()
|
|
||||||
if not data:
|
|
||||||
return {"custom_repos": []}
|
|
||||||
if "custom_repos" not in data:
|
|
||||||
data["custom_repos"] = []
|
|
||||||
return data
|
|
||||||
|
|
||||||
async def _save(self, data: dict[str, Any]) -> None:
|
|
||||||
await self._store.async_save(data)
|
|
||||||
|
|
||||||
async def list_custom_repos(self) -> list[CustomRepo]:
|
|
||||||
data = await self._load()
|
|
||||||
repos = data.get("custom_repos", [])
|
|
||||||
out: list[CustomRepo] = []
|
|
||||||
for r in repos:
|
|
||||||
if not isinstance(r, dict):
|
|
||||||
continue
|
|
||||||
rid = str(r.get("id") or "")
|
|
||||||
url = str(r.get("url") or "")
|
|
||||||
name = r.get("name")
|
|
||||||
if rid and url:
|
|
||||||
out.append(CustomRepo(id=rid, url=url, name=str(name) if name else None))
|
|
||||||
return out
|
|
||||||
|
|
||||||
async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo:
|
|
||||||
data = await self._load()
|
|
||||||
repos = data.get("custom_repos", [])
|
|
||||||
|
|
||||||
# Deduplicate by URL
|
|
||||||
for r in repos:
|
|
||||||
if isinstance(r, dict) and str(r.get("url", "")).strip() == url.strip():
|
|
||||||
# Update name if provided
|
|
||||||
if name:
|
|
||||||
r["name"] = name
|
|
||||||
await self._save(data)
|
|
||||||
return CustomRepo(id=str(r["id"]), url=str(r["url"]), name=r.get("name"))
|
|
||||||
|
|
||||||
rid = f"custom:{uuid.uuid4().hex[:10]}"
|
|
||||||
entry = {"id": rid, "url": url.strip(), "name": name.strip() if name else None}
|
|
||||||
repos.append(entry)
|
|
||||||
data["custom_repos"] = repos
|
|
||||||
await self._save(data)
|
|
||||||
return CustomRepo(id=rid, url=entry["url"], name=entry["name"])
|
|
||||||
|
|
||||||
async def remove_custom_repo(self, repo_id: str) -> None:
|
|
||||||
data = await self._load()
|
|
||||||
repos = data.get("custom_repos", [])
|
|
||||||
data["custom_repos"] = [r for r in repos if not (isinstance(r, dict) and r.get("id") == repo_id)]
|
|
||||||
await self._save(data)
|
|
||||||
|
|
||||||
Reference in New Issue
Block a user