custom_components/bahmcloud_store/core.py aktualisiert

This commit is contained in:
2026-01-15 14:30:54 +00:00
parent a2d123abbf
commit 1dbffcc27c

View File

@@ -6,17 +6,14 @@ import logging
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from urllib.parse import urlparse
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.util import yaml as ha_yaml from homeassistant.util import yaml as ha_yaml
from .storage import BCSStorage, CustomRepo from .providers import RepoInfo, fetch_repo_info
from .views import StaticAssetsView, BCSApiView, BCSReadmeView from .metadata import RepoMetadata, fetch_repo_metadata
from .custom_repo_view import BCSCustomRepoView from .storage import BCSStorage
from .providers import fetch_repo_info, detect_provider, RepoInfo from .views import StaticAssetsView, BCSApiView, BCSReadmeView, BCSCustomRepoView
from .metadata import fetch_repo_metadata, RepoMetadata
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -35,19 +32,23 @@ class BCSConfig:
@dataclass @dataclass
class RepoItem: class RepoItem:
id: str id: str
name: str
url: str url: str
source: str # "index" | "custom" name: str | None = None
description: str | None = None
category: str | None = None
owner: str | None = None # derived/provider info
provider: str | None = None provider: str | None = None
owner: str | None = None
provider_repo_name: str | None = None provider_repo_name: str | None = None
provider_description: str | None = None provider_description: str | None = None
default_branch: str | None = None default_branch: str | None = None
# version
latest_version: str | None = None latest_version: str | None = None
latest_version_source: str | None = None # "release" | "tag" | None latest_version_source: str | None = None
# metadata (bcs.yaml / hacs.yaml / hacs.json)
meta_source: str | None = None meta_source: str | None = None
meta_name: str | None = None meta_name: str | None = None
meta_description: str | None = None meta_description: str | None = None
@@ -55,6 +56,18 @@ class RepoItem:
meta_author: str | None = None meta_author: str | None = None
meta_maintainer: str | None = None meta_maintainer: str | None = None
# source
source: str | None = None # "index" | "custom"
def _stable_id_from_url(url: str) -> str:
u = (url or "").strip().rstrip("/")
if u.startswith("https://"):
u = u[8:]
elif u.startswith("http://"):
u = u[7:]
return u.replace("/", "_").replace(":", "_")
class BCSCore: class BCSCore:
def __init__(self, hass: HomeAssistant, config: BCSConfig) -> None: def __init__(self, hass: HomeAssistant, config: BCSConfig) -> None:
@@ -93,35 +106,51 @@ class BCSCore:
self.hass.http.register_view(BCSReadmeView(self)) self.hass.http.register_view(BCSReadmeView(self))
self.hass.http.register_view(BCSCustomRepoView(self)) self.hass.http.register_view(BCSCustomRepoView(self))
def list_repos_public(self) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for r in self.repos.values():
desc = r.meta_description or r.description or r.provider_description
name = r.name or r.meta_name or r.provider_repo_name or r.url
out.append(
{
"id": r.id,
"url": r.url,
"name": name,
"description": desc,
"category": r.meta_category or r.category,
"provider": r.provider,
"owner": r.owner,
"latest_version": r.latest_version,
"latest_version_source": r.latest_version_source,
"meta_source": r.meta_source,
"meta_author": r.meta_author,
"meta_maintainer": r.meta_maintainer,
"source": r.source,
}
)
return out
def get_repo(self, repo_id: str) -> RepoItem | None: def get_repo(self, repo_id: str) -> RepoItem | None:
return self.repos.get(repo_id) return self.repos.get(repo_id)
async def refresh(self) -> None: async def refresh(self) -> None:
index_repos, refresh_seconds = await self._load_index_repos() index_repos, refresh_seconds = await self._load_index_repos()
self.refresh_seconds = refresh_seconds self.refresh_seconds = int(refresh_seconds)
custom_repos = await self.storage.list_custom_repos() custom_repos = await self.storage.list_custom_repos()
merged: dict[str, RepoItem] = {} merged: dict[str, RepoItem] = {}
for item in index_repos: for item in index_repos:
merged[item.id] = item merged[item.id] = item
for c in custom_repos: for c in custom_repos:
merged[c.id] = RepoItem( merged[c.id] = RepoItem(
id=c.id, id=c.id,
name=(c.name or c.url),
url=c.url, url=c.url,
name=c.name,
source="custom", source="custom",
) )
for r in merged.values():
r.provider = detect_provider(r.url)
await self._enrich_and_resolve(merged)
self.repos = merged
async def _enrich_and_resolve(self, merged: dict[str, RepoItem]) -> None:
sem = asyncio.Semaphore(6) sem = asyncio.Semaphore(6)
async def process_one(r: RepoItem) -> None: async def process_one(r: RepoItem) -> None:
@@ -145,6 +174,7 @@ class BCSCore:
r.meta_author = md.author r.meta_author = md.author
r.meta_maintainer = md.maintainer r.meta_maintainer = md.maintainer
# name override rules (your requirement)
has_user_or_index_name = bool(r.name) and (r.name != r.url) and (not str(r.name).startswith("http")) has_user_or_index_name = bool(r.name) and (r.name != r.url) and (not str(r.name).startswith("http"))
if r.meta_name: if r.meta_name:
r.name = r.meta_name r.name = r.meta_name
@@ -153,171 +183,81 @@ class BCSCore:
elif not r.name: elif not r.name:
r.name = r.url r.name = r.url
await asyncio.gather(*(process_one(r) for r in merged.values()), return_exceptions=True) # description fallback
if not r.description:
r.description = r.meta_description or r.provider_description
await asyncio.gather(*(process_one(r) for r in merged.values()))
self.repos = merged
async def _load_index_repos(self) -> tuple[list[RepoItem], int]: async def _load_index_repos(self) -> tuple[list[RepoItem], int]:
store_url = (self.config.store_url or "").strip()
if not store_url:
raise BCSError("Missing store_url")
from homeassistant.helpers.aiohttp_client import async_get_clientsession
session = async_get_clientsession(self.hass) session = async_get_clientsession(self.hass)
try: async with session.get(store_url, timeout=25) as resp:
async with session.get(self.config.store_url, timeout=20) as resp: if resp.status != 200:
if resp.status != 200: raise BCSError(f"store.yaml HTTP {resp.status}")
raise BCSError(f"store_url returned {resp.status}") raw = await resp.text()
raw = await resp.text()
except Exception as e:
raise BCSError(f"Failed fetching store index: {e}") from e
try: data = ha_yaml.load(raw) or {}
data = ha_yaml.parse_yaml(raw) if not isinstance(data, dict):
if not isinstance(data, dict): raise BCSError("store.yaml must be a dict")
raise BCSError("store.yaml must be a mapping")
refresh_seconds = int(data.get("refresh_seconds", 300)) refresh_seconds = int(data.get("refresh_seconds", 300))
repos = data.get("repos", []) repos = data.get("repos", [])
if not isinstance(repos, list): if not isinstance(repos, list):
raise BCSError("store.yaml 'repos' must be a list") raise BCSError("store.yaml 'repos' must be a list")
items: list[RepoItem] = [] items: list[RepoItem] = []
for i, r in enumerate(repos): for r in repos:
if not isinstance(r, dict): if not isinstance(r, dict):
continue continue
url = str(r.get("url", "")).strip() url = str(r.get("url", "")).strip()
if not url: if not url:
continue continue
name = str(r.get("name") or url).strip() rid = str(r.get("id") or _stable_id_from_url(url))
name = r.get("name")
name = str(name).strip() if isinstance(name, str) and name.strip() else None
items.append( items.append(
RepoItem( RepoItem(
id=f"index:{i}", id=rid,
name=name, url=url,
url=url, name=name,
source="index", description=str(r.get("description")).strip() if isinstance(r.get("description"), str) else None,
) category=str(r.get("category")).strip() if isinstance(r.get("category"), str) else None,
source="index",
) )
)
return items, refresh_seconds return items, refresh_seconds
except Exception as e:
raise BCSError(f"Invalid store.yaml: {e}") from e
async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo: async def add_custom_repo(self, url: str, name: str | None = None):
repo = await self.storage.add_custom_repo(url=url, name=name) url = (url or "").strip()
if not url:
raise BCSError("Missing URL")
rid = _stable_id_from_url(url)
await self.storage.add_custom_repo(rid, url, name)
await self.refresh() await self.refresh()
self.signal_updated()
repo = self.repos.get(rid)
return repo return repo
async def remove_custom_repo(self, repo_id: str) -> None: async def remove_custom_repo(self, repo_id: str) -> None:
await self.storage.remove_custom_repo(repo_id) await self.storage.remove_custom_repo(repo_id)
await self.refresh() await self.refresh()
self.signal_updated()
async def list_custom_repos(self) -> list[CustomRepo]: async def fetch_readme_markdown(self, repo_id: str) -> Any:
return await self.storage.list_custom_repos() repo = self.repos.get(repo_id)
def list_repos_public(self) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for r in self.repos.values():
resolved_description = r.meta_description or r.provider_description
out.append(
{
"id": r.id,
"name": r.name,
"url": r.url,
"source": r.source,
"owner": r.owner,
"provider": r.provider,
"meta_source": r.meta_source,
"meta_name": r.meta_name,
"meta_description": r.meta_description,
"meta_category": r.meta_category,
"meta_author": r.meta_author,
"meta_maintainer": r.meta_maintainer,
"provider_repo_name": r.provider_repo_name,
"provider_description": r.provider_description,
"description": resolved_description,
"category": r.meta_category,
"latest_version": r.latest_version,
"latest_version_source": r.latest_version_source,
}
)
return out
# ----------------------------
# README fetching
# ----------------------------
def _normalize_repo_name(self, name: str | None) -> str | None:
if not name:
return None
n = name.strip()
if n.endswith(".git"):
n = n[:-4]
return n or None
def _split_owner_repo(self, repo_url: str) -> tuple[str | None, str | None]:
u = urlparse(repo_url.rstrip("/"))
parts = [p for p in u.path.strip("/").split("/") if p]
if len(parts) < 2:
return None, None
owner = parts[0].strip() or None
repo = self._normalize_repo_name(parts[1])
return owner, repo
def _is_github(self, repo_url: str) -> bool:
return "github.com" in urlparse(repo_url).netloc.lower()
def _is_gitea(self, repo_url: str) -> bool:
host = urlparse(repo_url).netloc.lower()
return host and "github.com" not in host and "gitlab.com" not in host
async def _fetch_text(self, url: str) -> str | None:
session = async_get_clientsession(self.hass)
try:
async with session.get(url, timeout=20) as resp:
if resp.status != 200:
return None
return await resp.text()
except Exception:
return None
async def fetch_readme_markdown(self, repo_id: str) -> str | None:
repo = self.get_repo(repo_id)
if not repo: if not repo:
return None return None
# provider readme is fetched by providers module
from .providers import fetch_readme
owner, name = self._split_owner_repo(repo.url) return await fetch_readme(self.hass, repo.url, repo.default_branch)
if not owner or not name:
return None
branch = repo.default_branch or "main"
filenames = ["README.md", "readme.md", "README.MD"]
candidates: list[str] = []
if self._is_github(repo.url):
# raw github content
base = f"https://raw.githubusercontent.com/{owner}/{name}/{branch}"
candidates.extend([f"{base}/{fn}" for fn in filenames])
elif self._is_gitea(repo.url):
u = urlparse(repo.url.rstrip("/"))
root = f"{u.scheme}://{u.netloc}/{owner}/{name}"
# gitea raw endpoints (both common forms)
bases = [
f"{root}/raw/branch/{branch}",
f"{root}/raw/{branch}",
]
for b in bases:
candidates.extend([f"{b}/{fn}" for fn in filenames])
else:
return None
for url in candidates:
txt = await self._fetch_text(url)
if txt:
return txt
return None