From 4e8116265d1fd8efc19e6f835a557388c3228eb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Bachmann?= Date: Thu, 15 Jan 2026 14:07:19 +0000 Subject: [PATCH] =?UTF-8?q?custom=5Fcomponents/bahmcloud=5Fstore/core.py?= =?UTF-8?q?=20gel=C3=B6scht?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- custom_components/bahmcloud_store/core.py | 323 ---------------------- 1 file changed, 323 deletions(-) delete mode 100644 custom_components/bahmcloud_store/core.py diff --git a/custom_components/bahmcloud_store/core.py b/custom_components/bahmcloud_store/core.py deleted file mode 100644 index 3028c3c..0000000 --- a/custom_components/bahmcloud_store/core.py +++ /dev/null @@ -1,323 +0,0 @@ -from __future__ import annotations - -import asyncio -import json -import logging -from dataclasses import dataclass -from pathlib import Path -from typing import Any -from urllib.parse import urlparse - -from homeassistant.core import HomeAssistant -from homeassistant.helpers.aiohttp_client import async_get_clientsession -from homeassistant.util import yaml as ha_yaml - -from .storage import BCSStorage, CustomRepo -from .views import StaticAssetsView, BCSApiView, BCSReadmeView -from .custom_repo_view import BCSCustomRepoView -from .providers import fetch_repo_info, detect_provider, RepoInfo -from .metadata import fetch_repo_metadata, RepoMetadata - -_LOGGER = logging.getLogger(__name__) - -DOMAIN = "bahmcloud_store" - - -class BCSError(Exception): - """BCS core error.""" - - -@dataclass -class BCSConfig: - store_url: str - - -@dataclass -class RepoItem: - id: str - name: str - url: str - source: str # "index" | "custom" - - owner: str | None = None - provider: str | None = None - provider_repo_name: str | None = None - provider_description: str | None = None - default_branch: str | None = None - - latest_version: str | None = None - latest_version_source: str | None = None # "release" | "tag" | None - - meta_source: str | None = None - meta_name: str | None = None - meta_description: str | None = None - meta_category: str | None = None - meta_author: str | None = None - meta_maintainer: str | None = None - - -class BCSCore: - def __init__(self, hass: HomeAssistant, config: BCSConfig) -> None: - self.hass = hass - self.config = config - self.storage = BCSStorage(hass) - - self.refresh_seconds: int = 300 - self.repos: dict[str, RepoItem] = {} - self._listeners: list[callable] = [] - - self.version: str = self._read_manifest_version() - - def _read_manifest_version(self) -> str: - try: - manifest_path = Path(__file__).resolve().parent / "manifest.json" - data = json.loads(manifest_path.read_text(encoding="utf-8")) - v = data.get("version") - return str(v) if v else "unknown" - except Exception: - return "unknown" - - def add_listener(self, cb) -> None: - self._listeners.append(cb) - - def signal_updated(self) -> None: - for cb in list(self._listeners): - try: - cb() - except Exception: - pass - - async def register_http_views(self) -> None: - self.hass.http.register_view(StaticAssetsView()) - self.hass.http.register_view(BCSApiView(self)) - self.hass.http.register_view(BCSReadmeView(self)) - self.hass.http.register_view(BCSCustomRepoView(self)) - - def get_repo(self, repo_id: str) -> RepoItem | None: - return self.repos.get(repo_id) - - async def refresh(self) -> None: - index_repos, refresh_seconds = await self._load_index_repos() - self.refresh_seconds = refresh_seconds - - custom_repos = await self.storage.list_custom_repos() - - merged: dict[str, RepoItem] = {} - - for item in index_repos: - merged[item.id] = item - - for c in custom_repos: - merged[c.id] = RepoItem( - id=c.id, - name=(c.name or c.url), - url=c.url, - source="custom", - ) - - for r in merged.values(): - r.provider = detect_provider(r.url) - - await self._enrich_and_resolve(merged) - self.repos = merged - - async def _enrich_and_resolve(self, merged: dict[str, RepoItem]) -> None: - sem = asyncio.Semaphore(6) - - async def process_one(r: RepoItem) -> None: - async with sem: - info: RepoInfo = await fetch_repo_info(self.hass, r.url) - - r.provider = info.provider or r.provider - r.owner = info.owner or r.owner - r.provider_repo_name = info.repo_name - r.provider_description = info.description - r.default_branch = info.default_branch or r.default_branch - - r.latest_version = info.latest_version - r.latest_version_source = info.latest_version_source - - md: RepoMetadata = await fetch_repo_metadata(self.hass, r.url, r.default_branch) - r.meta_source = md.source - r.meta_name = md.name - r.meta_description = md.description - r.meta_category = md.category - r.meta_author = md.author - r.meta_maintainer = md.maintainer - - has_user_or_index_name = bool(r.name) and (r.name != r.url) and (not str(r.name).startswith("http")) - if r.meta_name: - r.name = r.meta_name - elif not has_user_or_index_name and r.provider_repo_name: - r.name = r.provider_repo_name - elif not r.name: - r.name = r.url - - await asyncio.gather(*(process_one(r) for r in merged.values()), return_exceptions=True) - - async def _load_index_repos(self) -> tuple[list[RepoItem], int]: - session = async_get_clientsession(self.hass) - try: - async with session.get(self.config.store_url, timeout=20) as resp: - if resp.status != 200: - raise BCSError(f"store_url returned {resp.status}") - raw = await resp.text() - except Exception as e: - raise BCSError(f"Failed fetching store index: {e}") from e - - try: - data = ha_yaml.parse_yaml(raw) - if not isinstance(data, dict): - raise BCSError("store.yaml must be a mapping") - - refresh_seconds = int(data.get("refresh_seconds", 300)) - repos = data.get("repos", []) - if not isinstance(repos, list): - raise BCSError("store.yaml 'repos' must be a list") - - items: list[RepoItem] = [] - for i, r in enumerate(repos): - if not isinstance(r, dict): - continue - url = str(r.get("url", "")).strip() - if not url: - continue - name = str(r.get("name") or url).strip() - - items.append( - RepoItem( - id=f"index:{i}", - name=name, - url=url, - source="index", - ) - ) - - return items, refresh_seconds - except Exception as e: - raise BCSError(f"Invalid store.yaml: {e}") from e - - async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo: - repo = await self.storage.add_custom_repo(url=url, name=name) - await self.refresh() - self.signal_updated() - return repo - - async def remove_custom_repo(self, repo_id: str) -> None: - await self.storage.remove_custom_repo(repo_id) - await self.refresh() - self.signal_updated() - - async def list_custom_repos(self) -> list[CustomRepo]: - return await self.storage.list_custom_repos() - - def list_repos_public(self) -> list[dict[str, Any]]: - out: list[dict[str, Any]] = [] - for r in self.repos.values(): - resolved_description = r.meta_description or r.provider_description - out.append( - { - "id": r.id, - "name": r.name, - "url": r.url, - "source": r.source, - "owner": r.owner, - "provider": r.provider, - - "meta_source": r.meta_source, - "meta_name": r.meta_name, - "meta_description": r.meta_description, - "meta_category": r.meta_category, - "meta_author": r.meta_author, - "meta_maintainer": r.meta_maintainer, - - "provider_repo_name": r.provider_repo_name, - "provider_description": r.provider_description, - - "description": resolved_description, - "category": r.meta_category, - - "latest_version": r.latest_version, - "latest_version_source": r.latest_version_source, - } - ) - return out - - # ---------------------------- - # README fetching - # ---------------------------- - - def _normalize_repo_name(self, name: str | None) -> str | None: - if not name: - return None - n = name.strip() - if n.endswith(".git"): - n = n[:-4] - return n or None - - def _split_owner_repo(self, repo_url: str) -> tuple[str | None, str | None]: - u = urlparse(repo_url.rstrip("/")) - parts = [p for p in u.path.strip("/").split("/") if p] - if len(parts) < 2: - return None, None - owner = parts[0].strip() or None - repo = self._normalize_repo_name(parts[1]) - return owner, repo - - def _is_github(self, repo_url: str) -> bool: - return "github.com" in urlparse(repo_url).netloc.lower() - - def _is_gitea(self, repo_url: str) -> bool: - host = urlparse(repo_url).netloc.lower() - return host and "github.com" not in host and "gitlab.com" not in host - - async def _fetch_text(self, url: str) -> str | None: - session = async_get_clientsession(self.hass) - try: - async with session.get(url, timeout=20) as resp: - if resp.status != 200: - return None - return await resp.text() - except Exception: - return None - - async def fetch_readme_markdown(self, repo_id: str) -> str | None: - repo = self.get_repo(repo_id) - if not repo: - return None - - owner, name = self._split_owner_repo(repo.url) - if not owner or not name: - return None - - branch = repo.default_branch or "main" - filenames = ["README.md", "readme.md", "README.MD"] - - candidates: list[str] = [] - - if self._is_github(repo.url): - # raw github content - base = f"https://raw.githubusercontent.com/{owner}/{name}/{branch}" - candidates.extend([f"{base}/{fn}" for fn in filenames]) - - elif self._is_gitea(repo.url): - u = urlparse(repo.url.rstrip("/")) - root = f"{u.scheme}://{u.netloc}/{owner}/{name}" - - # gitea raw endpoints (both common forms) - bases = [ - f"{root}/raw/branch/{branch}", - f"{root}/raw/{branch}", - ] - for b in bases: - candidates.extend([f"{b}/{fn}" for fn in filenames]) - - else: - return None - - for url in candidates: - txt = await self._fetch_text(url) - if txt: - return txt - - return None