custom_components/bahmcloud_store/core.py hinzugefügt

This commit is contained in:
2026-01-15 06:10:29 +00:00
parent 2da0cfe07d
commit 806524ad33

View File

@@ -0,0 +1,197 @@
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import Any
from urllib.parse import urlparse
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.util import yaml as ha_yaml
from .storage import BCSStorage, CustomRepo
from .views import StaticAssetsView, BCSApiView
from .custom_repo_view import BCSCustomRepoView
_LOGGER = logging.getLogger(__name__)
DOMAIN = "bahmcloud_store"
class BCSError(Exception):
"""BCS core error."""
@dataclass
class BCSConfig:
store_url: str
@dataclass
class RepoItem:
"""Merged repository item (from store.yaml + custom repos)."""
id: str
name: str
url: str
source: str # "index" | "custom"
# Enrichment fields (filled later)
owner: str | None = None
description: str | None = None
provider: str | None = None
class BCSCore:
"""Core services for BCS: indexing + storage + HTTP views."""
def __init__(self, hass: HomeAssistant, config: BCSConfig) -> None:
self.hass = hass
self.config = config
self.storage = BCSStorage(hass)
self.refresh_seconds: int = 300
self.repos: dict[str, RepoItem] = {}
self._listeners: list[callable] = []
def add_listener(self, cb) -> None:
self._listeners.append(cb)
def signal_updated(self) -> None:
for cb in list(self._listeners):
try:
cb()
except Exception:
pass
async def register_http_views(self) -> None:
"""Register static assets and API routes."""
self.hass.http.register_view(StaticAssetsView())
self.hass.http.register_view(BCSApiView(self))
self.hass.http.register_view(BCSCustomRepoView(self))
@staticmethod
def _provider_name(repo_url: str) -> str:
"""Best-effort provider detection by hostname."""
host = urlparse(repo_url).netloc.lower()
if "github.com" in host:
return "github"
if "gitlab.com" in host:
return "gitlab"
# Likely self-hosted (Gitea/GitLab/other)
return "generic"
@staticmethod
def _owner_from_url(repo_url: str) -> str | None:
"""Extract owner/group from common URL patterns."""
u = urlparse(repo_url.rstrip("/"))
parts = u.path.strip("/").split("/")
if len(parts) >= 2:
return parts[0]
return None
async def refresh(self) -> None:
"""Refresh merged repo list (index + custom)."""
index_repos, refresh_seconds = await self._load_index_repos()
self.refresh_seconds = refresh_seconds
custom_repos = await self.storage.list_custom_repos()
merged: dict[str, RepoItem] = {}
# Index repos
for item in index_repos:
merged[item.id] = item
# Custom repos override by id (or add)
for c in custom_repos:
merged[c.id] = RepoItem(
id=c.id,
name=c.name or c.url,
url=c.url,
source="custom",
)
# Enrich basic data (owner/provider); description will come later via provider APIs
for r in merged.values():
r.provider = self._provider_name(r.url)
r.owner = self._owner_from_url(r.url)
self.repos = merged
async def _load_index_repos(self) -> tuple[list[RepoItem], int]:
"""Load store.yaml and return (repos, refresh_seconds)."""
session = async_get_clientsession(self.hass)
try:
async with session.get(self.config.store_url, timeout=20) as resp:
if resp.status != 200:
raise BCSError(f"store_url returned {resp.status}")
raw = await resp.text()
except Exception as e:
raise BCSError(f"Failed fetching store index: {e}") from e
try:
data = ha_yaml.parse_yaml(raw)
if not isinstance(data, dict):
raise BCSError("store.yaml must be a mapping")
refresh_seconds = int(data.get("refresh_seconds", 300))
repos = data.get("repos", [])
if not isinstance(repos, list):
raise BCSError("store.yaml 'repos' must be a list")
items: list[RepoItem] = []
for i, r in enumerate(repos):
if not isinstance(r, dict):
continue
url = str(r.get("url", "")).strip()
if not url:
continue
name = str(r.get("name") or url).strip()
rid = f"index:{i}"
items.append(
RepoItem(
id=rid,
name=name,
url=url,
source="index",
)
)
return items, refresh_seconds
except Exception as e:
raise BCSError(f"Invalid store.yaml: {e}") from e
# --- Custom repo management (used by API) ---
async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo:
repo = await self.storage.add_custom_repo(url=url, name=name)
await self.refresh()
self.signal_updated()
return repo
async def remove_custom_repo(self, repo_id: str) -> None:
await self.storage.remove_custom_repo(repo_id)
await self.refresh()
self.signal_updated()
async def list_custom_repos(self) -> list[CustomRepo]:
return await self.storage.list_custom_repos()
def list_repos_public(self) -> list[dict[str, Any]]:
"""Return repo list for UI."""
out: list[dict[str, Any]] = []
for r in self.repos.values():
out.append(
{
"id": r.id,
"name": r.name,
"url": r.url,
"source": r.source,
"owner": r.owner,
"provider": r.provider,
"description": r.description,
}
)
return out