revert custom_components/bahmcloud_store/core.py aktualisiert
This commit is contained in:
2026-01-15 18:02:30 +00:00
parent 618511be73
commit 132f9e27c1

View File

@@ -1,268 +1,328 @@
"""Core logic for Bahmcloud Store (BCS).
Responsibilities:
- Load/parse central index (store.yaml)
- Merge index repositories + custom repositories
- Provider abstraction calls (GitHub/GitLab/Gitea/Bahmcloud)
- Metadata parsing
- Refresh pipeline & timers
- (Phase C.1) Install dry-run: validate repo existence, resolve domain, compute target path
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import os
import time
from dataclasses import dataclass
from typing import Any, Final
import aiohttp
from pathlib import Path
from typing import Any
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.util import yaml as ha_yaml
from .const import DOMAIN
from .providers import ProviderClient, build_provider_client
from .storage import BCSStorage, CustomRepo
from .providers import fetch_repo_info, detect_provider, RepoInfo, fetch_readme_markdown
from .metadata import fetch_repo_metadata, RepoMetadata
_LOGGER = logging.getLogger(__name__)
DEFAULT_REFRESH_SECONDS: Final[int] = 300
MANIFEST_PATH: Final[str] = "custom_components/{domain}/manifest.json"
DOMAIN = "bahmcloud_store"
class RepoNotFoundError(Exception):
"""Raised when a repo_id does not exist in current store state."""
class BCSError(Exception):
"""BCS core error."""
class DomainResolutionError(Exception):
"""Raised when domain could not be resolved from a repository."""
@dataclass
class BCSConfig:
store_url: str
@dataclass(slots=True)
class StoreRepo:
"""Normalized repository entry used by BCS."""
@dataclass
class RepoItem:
id: str
name: str
url: str
category: str | None = None
source: str # "index" | "custom"
owner: str | None = None
provider: str | None = None
provider_repo_name: str | None = None
provider_description: str | None = None
default_branch: str | None = None
latest_version: str | None = None
latest_version_source: str | None = None # "release" | "tag" | "atom" | None
meta_source: str | None = None
meta_name: str | None = None
meta_description: str | None = None
meta_category: str | None = None
meta_author: str | None = None
meta_maintainer: str | None = None
class BahmcloudStoreCore:
"""BCS core object."""
def __init__(
self,
hass: HomeAssistant,
session: aiohttp.ClientSession,
index_url: str,
) -> None:
class BCSCore:
def __init__(self, hass: HomeAssistant, config: BCSConfig) -> None:
self.hass = hass
self.session = session
self.index_url = index_url
self.config = config
self.storage = BCSStorage(hass)
self.refresh_seconds: int = DEFAULT_REFRESH_SECONDS
self.refresh_seconds: int = 300
self.repos: dict[str, RepoItem] = {}
self._listeners: list[callable] = []
self._repos: dict[str, StoreRepo] = {}
self._lock = asyncio.Lock()
# Will be loaded asynchronously (no blocking IO in event loop)
self.version: str = "unknown"
# Provider clients are created per request based on repo url (provider-neutral).
# No persistent auth handling in Phase C.1.
# Diagnostics (helps verify refresh behavior)
self.last_index_url: str | None = None
self.last_index_bytes: int | None = None
self.last_index_hash: str | None = None
self.last_index_loaded_at: float | None = None
@property
def repos(self) -> dict[str, StoreRepo]:
"""Return current normalized repo map."""
return self._repos
async def async_initialize(self) -> None:
"""Async initialization that avoids blocking file IO."""
self.version = await self._read_manifest_version_async()
async def async_load_index(self) -> None:
"""Load and parse the store.yaml from index repository."""
# NOTE: You likely already have real YAML parsing and merge logic.
# This implementation expects that your existing code handles this;
# here we provide a robust minimal version that can be replaced/merged.
async def _read_manifest_version_async(self) -> str:
def _read() -> str:
try:
manifest_path = Path(__file__).resolve().parent / "manifest.json"
data = json.loads(manifest_path.read_text(encoding="utf-8"))
v = data.get("version")
return str(v) if v else "unknown"
except Exception:
return "unknown"
_LOGGER.info("BCS index loading (url=%s)", self.index_url)
return await self.hass.async_add_executor_job(_read)
text = await self._http_get_text(self.index_url)
data = self._parse_yaml_minimal(text)
def add_listener(self, cb) -> None:
self._listeners.append(cb)
self.refresh_seconds = int(data.get("refresh_seconds", DEFAULT_REFRESH_SECONDS))
def signal_updated(self) -> None:
for cb in list(self._listeners):
try:
cb()
except Exception:
pass
repos_raw = data.get("repos", [])
repos: dict[str, StoreRepo] = {}
async def full_refresh(self, source: str = "manual") -> None:
"""Single refresh entry-point used by both timer and manual button."""
_LOGGER.info("BCS full refresh triggered (source=%s)", source)
await self.refresh()
self.signal_updated()
for idx, item in enumerate(repos_raw):
if not isinstance(item, dict):
continue
def get_repo(self, repo_id: str) -> RepoItem | None:
return self.repos.get(repo_id)
name = str(item.get("name") or f"repo-{idx}")
url = str(item.get("url") or "")
category = item.get("category")
if not url:
continue
async def refresh(self) -> None:
index_repos, refresh_seconds = await self._load_index_repos()
self.refresh_seconds = refresh_seconds
repo_id = str(item.get("id") or self._default_repo_id(url))
repos[repo_id] = StoreRepo(
id=repo_id,
name=name,
url=url,
category=str(category) if category is not None else None,
custom_repos = await self.storage.list_custom_repos()
merged: dict[str, RepoItem] = {}
for item in index_repos:
merged[item.id] = item
for c in custom_repos:
merged[c.id] = RepoItem(
id=c.id,
name=(c.name or c.url),
url=c.url,
source="custom",
)
async with self._lock:
self._repos = repos
for r in merged.values():
r.provider = detect_provider(r.url)
_LOGGER.info("BCS index parsed (repos=%d refresh_seconds=%d)", len(repos), self.refresh_seconds)
async def async_install_dry_run(self, repo_id: str) -> dict[str, Any]:
"""Dry-run installation check for a repository.
Validates:
- repo exists
- domain can be resolved
- returns target path
Does not write any files (Phase C.1).
"""
async with self._lock:
repo = self._repos.get(repo_id)
if repo is None:
raise RepoNotFoundError()
provider: ProviderClient = build_provider_client(self.session, repo.url)
domain = await self._resolve_domain(provider)
target_path = f"/config/custom_components/{domain}"
await self._enrich_and_resolve(merged)
self.repos = merged
_LOGGER.info(
"BCS install dry-run resolved domain=%s target=%s (repo_id=%s)",
domain,
target_path,
repo_id,
"BCS refresh complete: repos=%s (index=%s, custom=%s)",
len(self.repos),
len([r for r in self.repos.values() if r.source == "index"]),
len([r for r in self.repos.values() if r.source == "custom"]),
)
return {"domain": domain, "target_path": target_path}
async def _enrich_and_resolve(self, merged: dict[str, RepoItem]) -> None:
sem = asyncio.Semaphore(6)
async def _resolve_domain(self, provider: ProviderClient) -> str:
"""Resolve HA integration domain from repository.
async def process_one(r: RepoItem) -> None:
async with sem:
info: RepoInfo = await fetch_repo_info(self.hass, r.url)
Strategy (Phase C.1):
- Fetch raw manifest.json from default branch
- Parse domain field
r.provider = info.provider or r.provider
r.owner = info.owner or r.owner
r.provider_repo_name = info.repo_name
r.provider_description = info.description
r.default_branch = info.default_branch or r.default_branch
If not resolvable, raise DomainResolutionError.
"""
default_branch = await provider.async_get_default_branch()
manifest_relpath = "custom_components/bahmcloud_store/manifest.json"
r.latest_version = info.latest_version
r.latest_version_source = info.latest_version_source
# We don't know the component path of third-party repos.
# For BCS-installed integrations, HACS-like repos usually have:
# - custom_components/<domain>/manifest.json
#
# Phase C.1: we try to find domain by searching common manifest locations.
# (Still no file writes; only raw fetch.)
candidates = [
"manifest.json",
"custom_components/manifest.json", # rare
]
md: RepoMetadata = await fetch_repo_metadata(self.hass, r.url, r.default_branch)
r.meta_source = md.source
r.meta_name = md.name
r.meta_description = md.description
r.meta_category = md.category
r.meta_author = md.author
r.meta_maintainer = md.maintainer
# Try to locate custom_components/<something>/manifest.json by scanning a small list index file.
# Provider-neutral listing APIs differ, so we avoid directory listing here.
# Instead we probe a few common patterns with heuristics:
# 1) root manifest.json
# 2) custom_components/<repo_name>/manifest.json (guess)
# 3) custom_components/<domain>/manifest.json is not guessable without listing
#
# For now we implement a stable behavior:
# - fetch root manifest.json
# - if absent, try guessed custom_components/<slug>/manifest.json from repo name inferred by provider
#
# This is acceptable for Phase C.1 dry-run; Phase C.2 will use the extracted ZIP to validate.
has_user_or_index_name = bool(r.name) and (r.name != r.url) and (not str(r.name).startswith("http"))
if r.meta_name:
r.name = r.meta_name
elif not has_user_or_index_name and r.provider_repo_name:
r.name = r.provider_repo_name
elif not r.name:
r.name = r.url
# 1) root manifest.json
domain = await self._try_manifest_domain(provider, default_branch, "manifest.json")
if domain:
return domain
await asyncio.gather(*(process_one(r) for r in merged.values()), return_exceptions=True)
# 2) guess slug from repo url last path component
slug = provider.repo_slug
if slug:
guess_path = MANIFEST_PATH.format(domain=slug)
domain = await self._try_manifest_domain(provider, default_branch, guess_path)
if domain:
return domain
def _add_cache_buster(self, url: str) -> str:
parts = urlsplit(url)
q = dict(parse_qsl(parts.query, keep_blank_values=True))
q["t"] = str(int(time.time()))
new_query = urlencode(q)
return urlunsplit((parts.scheme, parts.netloc, parts.path, new_query, parts.fragment))
# 3) fallback: try to read bcs/hacs metadata for domain (optional)
# If your metadata.py already extracts "domain", you can wire it in here.
# We keep Phase C.1 strict: domain must be resolvable.
raise DomainResolutionError(
"Unable to resolve integration domain (missing/invalid manifest.json)"
)
def _gitea_src_to_raw(self, url: str) -> str:
parts = urlsplit(url)
path = parts.path
path2 = path.replace("/src/branch/", "/raw/branch/")
if path2 == path:
return url
return urlunsplit((parts.scheme, parts.netloc, path2, parts.query, parts.fragment))
async def _try_manifest_domain(
self, provider: ProviderClient, branch: str, path: str
) -> str | None:
"""Try to fetch and parse manifest.json from a given path."""
try:
raw = await provider.async_get_raw_file(path, branch=branch)
except FileNotFoundError:
return None
except Exception as err: # pylint: disable=broad-exception-caught
_LOGGER.debug("BCS manifest probe failed (%s): %s", path, err)
return None
async def _fetch_store_text(self, url: str) -> str:
session = async_get_clientsession(self.hass)
try:
data = json.loads(raw)
except Exception: # pylint: disable=broad-exception-caught
return None
headers = {
"User-Agent": "BahmcloudStore (Home Assistant)",
"Cache-Control": "no-cache, no-store, max-age=0",
"Pragma": "no-cache",
"Expires": "0",
}
domain = data.get("domain")
if isinstance(domain, str) and domain.strip():
return domain.strip()
return None
async def _http_get_text(self, url: str) -> str:
"""GET url and return text."""
async with self.session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp:
resp.raise_for_status()
async with session.get(url, timeout=30, headers=headers) as resp:
if resp.status != 200:
raise BCSError(f"store_url returned {resp.status}")
return await resp.text()
def _default_repo_id(self, url: str) -> str:
"""Build a stable-ish repo_id from URL."""
# Keep simple and deterministic:
# - strip protocol
# - strip trailing .git
# - use host/path
u = url.strip()
u = u.replace("https://", "").replace("http://", "")
if u.endswith(".git"):
u = u[:-4]
return u
async def _load_index_repos(self) -> tuple[list[RepoItem], int]:
store_url = (self.config.store_url or "").strip()
if not store_url:
raise BCSError("store_url is empty")
def _parse_yaml_minimal(self, text: str) -> dict[str, Any]:
"""Minimal YAML parser fallback.
url = self._add_cache_buster(store_url)
NOTE: You very likely already use PyYAML/ruamel in your real code.
This exists only to keep this file self-contained.
"""
# Home Assistant ships with PyYAML internally in many environments,
# but we avoid hard dependency assumptions here.
try:
import yaml # type: ignore
except Exception as err: # pylint: disable=broad-exception-caught
raise RuntimeError("YAML parser not available") from err
raw = await self._fetch_store_text(url)
data = yaml.safe_load(text) or {}
if not isinstance(data, dict):
return {}
return data
# If we fetched a HTML page (wrong endpoint), attempt raw conversion.
if "<html" in raw.lower() or "<!doctype html" in raw.lower():
fallback = self._add_cache_buster(self._gitea_src_to_raw(store_url))
if fallback != url:
_LOGGER.warning("BCS store index looked like HTML, retrying raw URL")
raw = await self._fetch_store_text(fallback)
url = fallback
except Exception as e:
raise BCSError(f"Failed fetching store index: {e}") from e
async def async_setup_core(hass: HomeAssistant, session: aiohttp.ClientSession, index_url: str) -> BahmcloudStoreCore:
"""Create and store the core instance."""
core = BahmcloudStoreCore(hass=hass, session=session, index_url=index_url)
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN]["core"] = core
return core
# Diagnostics
b = raw.encode("utf-8", errors="replace")
h = hashlib.sha256(b).hexdigest()[:12]
self.last_index_url = url
self.last_index_bytes = len(b)
self.last_index_hash = h
self.last_index_loaded_at = time.time()
_LOGGER.info(
"BCS index loaded: url=%s bytes=%s sha=%s",
self.last_index_url,
self.last_index_bytes,
self.last_index_hash,
)
try:
data = ha_yaml.parse_yaml(raw)
if not isinstance(data, dict):
raise BCSError("store.yaml must be a mapping")
refresh_seconds = int(data.get("refresh_seconds", 300))
repos = data.get("repos", [])
if not isinstance(repos, list):
raise BCSError("store.yaml 'repos' must be a list")
items: list[RepoItem] = []
for i, r in enumerate(repos):
if not isinstance(r, dict):
continue
repo_url = str(r.get("url", "")).strip()
if not repo_url:
continue
name = str(r.get("name") or repo_url).strip()
items.append(
RepoItem(
id=f"index:{i}",
name=name,
url=repo_url,
source="index",
)
)
_LOGGER.info("BCS index parsed: repos=%s refresh_seconds=%s", len(items), refresh_seconds)
return items, refresh_seconds
except Exception as e:
raise BCSError(f"Invalid store.yaml: {e}") from e
async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo:
url = str(url or "").strip()
if not url:
raise BCSError("Missing url")
c = await self.storage.add_custom_repo(url, name)
await self.full_refresh(source="custom_repo_add")
return c
async def remove_custom_repo(self, repo_id: str) -> None:
await self.storage.remove_custom_repo(repo_id)
await self.full_refresh(source="custom_repo_remove")
async def list_custom_repos(self) -> list[CustomRepo]:
return await self.storage.list_custom_repos()
def list_repos_public(self) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for r in self.repos.values():
out.append(
{
"id": r.id,
"name": r.name,
"url": r.url,
"source": r.source,
"owner": r.owner,
"provider": r.provider,
"repo_name": r.provider_repo_name,
"description": r.provider_description or r.meta_description,
"default_branch": r.default_branch,
"latest_version": r.latest_version,
"latest_version_source": r.latest_version_source,
"category": r.meta_category,
"meta_author": r.meta_author,
"meta_maintainer": r.meta_maintainer,
"meta_source": r.meta_source,
}
)
return out
async def fetch_readme_markdown(self, repo_id: str) -> str | None:
repo = self.get_repo(repo_id)
if not repo:
return None
return await fetch_readme_markdown(
self.hass,
repo.url,
provider=repo.provider,
default_branch=repo.default_branch,
)