revert custom_components/bahmcloud_store/core.py aktualisiert
This commit is contained in:
2026-01-15 14:38:00 +00:00
parent f3863ee227
commit 5684c3d5f1

View File

@@ -6,14 +6,17 @@ import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.util import yaml as ha_yaml
from .providers import RepoInfo, fetch_repo_info
from .metadata import RepoMetadata, fetch_repo_metadata
from .storage import BCSStorage
from .views import StaticAssetsView, BCSApiView, BCSReadmeView, BCSCustomRepoView
from .storage import BCSStorage, CustomRepo
from .views import StaticAssetsView, BCSApiView, BCSReadmeView
from .custom_repo_view import BCSCustomRepoView
from .providers import fetch_repo_info, detect_provider, RepoInfo
from .metadata import fetch_repo_metadata, RepoMetadata
_LOGGER = logging.getLogger(__name__)
@@ -32,23 +35,19 @@ class BCSConfig:
@dataclass
class RepoItem:
id: str
name: str
url: str
name: str | None = None
description: str | None = None
category: str | None = None
source: str # "index" | "custom"
# derived/provider info
provider: str | None = None
owner: str | None = None
provider: str | None = None
provider_repo_name: str | None = None
provider_description: str | None = None
default_branch: str | None = None
# version
latest_version: str | None = None
latest_version_source: str | None = None
latest_version_source: str | None = None # "release" | "tag" | None
# metadata (bcs.yaml / hacs.yaml / hacs.json)
meta_source: str | None = None
meta_name: str | None = None
meta_description: str | None = None
@@ -56,18 +55,6 @@ class RepoItem:
meta_author: str | None = None
meta_maintainer: str | None = None
# source
source: str | None = None # "index" | "custom"
def _stable_id_from_url(url: str) -> str:
u = (url or "").strip().rstrip("/")
if u.startswith("https://"):
u = u[8:]
elif u.startswith("http://"):
u = u[7:]
return u.replace("/", "_").replace(":", "_")
class BCSCore:
def __init__(self, hass: HomeAssistant, config: BCSConfig) -> None:
@@ -106,51 +93,35 @@ class BCSCore:
self.hass.http.register_view(BCSReadmeView(self))
self.hass.http.register_view(BCSCustomRepoView(self))
def list_repos_public(self) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for r in self.repos.values():
desc = r.meta_description or r.description or r.provider_description
name = r.name or r.meta_name or r.provider_repo_name or r.url
out.append(
{
"id": r.id,
"url": r.url,
"name": name,
"description": desc,
"category": r.meta_category or r.category,
"provider": r.provider,
"owner": r.owner,
"latest_version": r.latest_version,
"latest_version_source": r.latest_version_source,
"meta_source": r.meta_source,
"meta_author": r.meta_author,
"meta_maintainer": r.meta_maintainer,
"source": r.source,
}
)
return out
def get_repo(self, repo_id: str) -> RepoItem | None:
return self.repos.get(repo_id)
async def refresh(self) -> None:
index_repos, refresh_seconds = await self._load_index_repos()
self.refresh_seconds = int(refresh_seconds)
self.refresh_seconds = refresh_seconds
custom_repos = await self.storage.list_custom_repos()
merged: dict[str, RepoItem] = {}
for item in index_repos:
merged[item.id] = item
for c in custom_repos:
merged[c.id] = RepoItem(
id=c.id,
name=(c.name or c.url),
url=c.url,
name=c.name,
source="custom",
)
for r in merged.values():
r.provider = detect_provider(r.url)
await self._enrich_and_resolve(merged)
self.repos = merged
async def _enrich_and_resolve(self, merged: dict[str, RepoItem]) -> None:
sem = asyncio.Semaphore(6)
async def process_one(r: RepoItem) -> None:
@@ -174,7 +145,6 @@ class BCSCore:
r.meta_author = md.author
r.meta_maintainer = md.maintainer
# name override rules (your requirement)
has_user_or_index_name = bool(r.name) and (r.name != r.url) and (not str(r.name).startswith("http"))
if r.meta_name:
r.name = r.meta_name
@@ -183,81 +153,171 @@ class BCSCore:
elif not r.name:
r.name = r.url
# description fallback
if not r.description:
r.description = r.meta_description or r.provider_description
await asyncio.gather(*(process_one(r) for r in merged.values()))
self.repos = merged
await asyncio.gather(*(process_one(r) for r in merged.values()), return_exceptions=True)
async def _load_index_repos(self) -> tuple[list[RepoItem], int]:
store_url = (self.config.store_url or "").strip()
if not store_url:
raise BCSError("Missing store_url")
from homeassistant.helpers.aiohttp_client import async_get_clientsession
session = async_get_clientsession(self.hass)
async with session.get(store_url, timeout=25) as resp:
if resp.status != 200:
raise BCSError(f"store.yaml HTTP {resp.status}")
raw = await resp.text()
try:
async with session.get(self.config.store_url, timeout=20) as resp:
if resp.status != 200:
raise BCSError(f"store_url returned {resp.status}")
raw = await resp.text()
except Exception as e:
raise BCSError(f"Failed fetching store index: {e}") from e
data = ha_yaml.load(raw) or {}
if not isinstance(data, dict):
raise BCSError("store.yaml must be a dict")
try:
data = ha_yaml.parse_yaml(raw)
if not isinstance(data, dict):
raise BCSError("store.yaml must be a mapping")
refresh_seconds = int(data.get("refresh_seconds", 300))
repos = data.get("repos", [])
if not isinstance(repos, list):
raise BCSError("store.yaml 'repos' must be a list")
refresh_seconds = int(data.get("refresh_seconds", 300))
repos = data.get("repos", [])
if not isinstance(repos, list):
raise BCSError("store.yaml 'repos' must be a list")
items: list[RepoItem] = []
for r in repos:
if not isinstance(r, dict):
continue
url = str(r.get("url", "")).strip()
if not url:
continue
rid = str(r.get("id") or _stable_id_from_url(url))
name = r.get("name")
name = str(name).strip() if isinstance(name, str) and name.strip() else None
items: list[RepoItem] = []
for i, r in enumerate(repos):
if not isinstance(r, dict):
continue
url = str(r.get("url", "")).strip()
if not url:
continue
name = str(r.get("name") or url).strip()
items.append(
RepoItem(
id=rid,
url=url,
name=name,
description=str(r.get("description")).strip() if isinstance(r.get("description"), str) else None,
category=str(r.get("category")).strip() if isinstance(r.get("category"), str) else None,
source="index",
items.append(
RepoItem(
id=f"index:{i}",
name=name,
url=url,
source="index",
)
)
)
return items, refresh_seconds
return items, refresh_seconds
except Exception as e:
raise BCSError(f"Invalid store.yaml: {e}") from e
async def add_custom_repo(self, url: str, name: str | None = None):
url = (url or "").strip()
if not url:
raise BCSError("Missing URL")
rid = _stable_id_from_url(url)
await self.storage.add_custom_repo(rid, url, name)
async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo:
repo = await self.storage.add_custom_repo(url=url, name=name)
await self.refresh()
repo = self.repos.get(rid)
self.signal_updated()
return repo
async def remove_custom_repo(self, repo_id: str) -> None:
await self.storage.remove_custom_repo(repo_id)
await self.refresh()
self.signal_updated()
async def fetch_readme_markdown(self, repo_id: str) -> Any:
repo = self.repos.get(repo_id)
async def list_custom_repos(self) -> list[CustomRepo]:
return await self.storage.list_custom_repos()
def list_repos_public(self) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for r in self.repos.values():
resolved_description = r.meta_description or r.provider_description
out.append(
{
"id": r.id,
"name": r.name,
"url": r.url,
"source": r.source,
"owner": r.owner,
"provider": r.provider,
"meta_source": r.meta_source,
"meta_name": r.meta_name,
"meta_description": r.meta_description,
"meta_category": r.meta_category,
"meta_author": r.meta_author,
"meta_maintainer": r.meta_maintainer,
"provider_repo_name": r.provider_repo_name,
"provider_description": r.provider_description,
"description": resolved_description,
"category": r.meta_category,
"latest_version": r.latest_version,
"latest_version_source": r.latest_version_source,
}
)
return out
# ----------------------------
# README fetching
# ----------------------------
def _normalize_repo_name(self, name: str | None) -> str | None:
if not name:
return None
n = name.strip()
if n.endswith(".git"):
n = n[:-4]
return n or None
def _split_owner_repo(self, repo_url: str) -> tuple[str | None, str | None]:
u = urlparse(repo_url.rstrip("/"))
parts = [p for p in u.path.strip("/").split("/") if p]
if len(parts) < 2:
return None, None
owner = parts[0].strip() or None
repo = self._normalize_repo_name(parts[1])
return owner, repo
def _is_github(self, repo_url: str) -> bool:
return "github.com" in urlparse(repo_url).netloc.lower()
def _is_gitea(self, repo_url: str) -> bool:
host = urlparse(repo_url).netloc.lower()
return host and "github.com" not in host and "gitlab.com" not in host
async def _fetch_text(self, url: str) -> str | None:
session = async_get_clientsession(self.hass)
try:
async with session.get(url, timeout=20) as resp:
if resp.status != 200:
return None
return await resp.text()
except Exception:
return None
async def fetch_readme_markdown(self, repo_id: str) -> str | None:
repo = self.get_repo(repo_id)
if not repo:
return None
# provider readme is fetched by providers module
from .providers import fetch_readme
return await fetch_readme(self.hass, repo.url, repo.default_branch)
owner, name = self._split_owner_repo(repo.url)
if not owner or not name:
return None
branch = repo.default_branch or "main"
filenames = ["README.md", "readme.md", "README.MD"]
candidates: list[str] = []
if self._is_github(repo.url):
# raw github content
base = f"https://raw.githubusercontent.com/{owner}/{name}/{branch}"
candidates.extend([f"{base}/{fn}" for fn in filenames])
elif self._is_gitea(repo.url):
u = urlparse(repo.url.rstrip("/"))
root = f"{u.scheme}://{u.netloc}/{owner}/{name}"
# gitea raw endpoints (both common forms)
bases = [
f"{root}/raw/branch/{branch}",
f"{root}/raw/{branch}",
]
for b in bases:
candidates.extend([f"{b}/{fn}" for fn in filenames])
else:
return None
for url in candidates:
txt = await self._fetch_text(url)
if txt:
return txt
return None