custom_components/bahmcloud_store/core.py aktualisiert

This commit is contained in:
2026-01-15 09:20:16 +00:00
parent d27782ea9c
commit c500234e1d

View File

@@ -6,13 +6,14 @@ import logging
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from urllib.parse import urlparse
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.util import yaml as ha_yaml from homeassistant.util import yaml as ha_yaml
from .storage import BCSStorage, CustomRepo from .storage import BCSStorage, CustomRepo
from .views import StaticAssetsView, BCSApiView from .views import StaticAssetsView, BCSApiView, BCSReadmeView
from .custom_repo_view import BCSCustomRepoView from .custom_repo_view import BCSCustomRepoView
from .providers import fetch_repo_info, detect_provider, RepoInfo from .providers import fetch_repo_info, detect_provider, RepoInfo
from .metadata import fetch_repo_metadata, RepoMetadata from .metadata import fetch_repo_metadata, RepoMetadata
@@ -88,8 +89,12 @@ class BCSCore:
async def register_http_views(self) -> None: async def register_http_views(self) -> None:
self.hass.http.register_view(StaticAssetsView()) self.hass.http.register_view(StaticAssetsView())
self.hass.http.register_view(BCSApiView(self)) self.hass.http.register_view(BCSApiView(self))
self.hass.http.register_view(BCSReadmeView(self))
self.hass.http.register_view(BCSCustomRepoView(self)) self.hass.http.register_view(BCSCustomRepoView(self))
def get_repo(self, repo_id: str) -> RepoItem | None:
return self.repos.get(repo_id)
async def refresh(self) -> None: async def refresh(self) -> None:
index_repos, refresh_seconds = await self._load_index_repos() index_repos, refresh_seconds = await self._load_index_repos()
self.refresh_seconds = refresh_seconds self.refresh_seconds = refresh_seconds
@@ -121,7 +126,7 @@ class BCSCore:
async def process_one(r: RepoItem) -> None: async def process_one(r: RepoItem) -> None:
async with sem: async with sem:
# 1) Provider info (owner/description/default_branch/repo_name) # 1) Provider info
info: RepoInfo = await fetch_repo_info(self.hass, r.url) info: RepoInfo = await fetch_repo_info(self.hass, r.url)
r.provider = info.provider or r.provider r.provider = info.provider or r.provider
r.owner = info.owner or r.owner r.owner = info.owner or r.owner
@@ -139,23 +144,15 @@ class BCSCore:
r.meta_maintainer = md.maintainer r.meta_maintainer = md.maintainer
# 3) Resolve final display name (priority) # 3) Resolve final display name (priority)
# 1) metadata name
# 2) existing name (store.yaml or user-provided custom name)
# 3) provider repo name
# 4) url
has_user_or_index_name = bool(r.name) and (r.name != r.url) and (not str(r.name).startswith("http")) has_user_or_index_name = bool(r.name) and (r.name != r.url) and (not str(r.name).startswith("http"))
if r.meta_name: if r.meta_name:
r.name = r.meta_name r.name = r.meta_name
elif not has_user_or_index_name and r.provider_repo_name: elif not has_user_or_index_name and r.provider_repo_name:
# Only use provider repo name if no metadata AND no user/index name was given. # Only use provider repo name if no metadata AND no user/index name.
r.name = r.provider_repo_name r.name = r.provider_repo_name
elif not r.name: elif not r.name:
r.name = r.url r.name = r.url
# 4) Resolve description shown in UI: metadata first, then provider
# (The UI will also prefer meta_description; we just keep fields.)
return
await asyncio.gather(*(process_one(r) for r in merged.values()), return_exceptions=True) await asyncio.gather(*(process_one(r) for r in merged.values()), return_exceptions=True)
async def _load_index_repos(self) -> tuple[list[RepoItem], int]: async def _load_index_repos(self) -> tuple[list[RepoItem], int]:
@@ -218,6 +215,8 @@ class BCSCore:
def list_repos_public(self) -> list[dict[str, Any]]: def list_repos_public(self) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = [] out: list[dict[str, Any]] = []
for r in self.repos.values(): for r in self.repos.values():
resolved_description = r.meta_description or r.provider_description
out.append( out.append(
{ {
"id": r.id, "id": r.id,
@@ -236,6 +235,81 @@ class BCSCore:
"provider_repo_name": r.provider_repo_name, "provider_repo_name": r.provider_repo_name,
"provider_description": r.provider_description, "provider_description": r.provider_description,
# UI convenience:
"description": resolved_description,
"category": r.meta_category,
} }
) )
return out return out
# ----------------------------
# README fetching (0.4.0)
# ----------------------------
def _split_owner_repo(self, repo_url: str) -> tuple[str | None, str | None]:
u = urlparse(repo_url.rstrip("/"))
parts = [p for p in u.path.strip("/").split("/") if p]
if len(parts) < 2:
return None, None
return parts[0], parts[1]
def _is_github(self, repo_url: str) -> bool:
return "github.com" in urlparse(repo_url).netloc.lower()
def _is_gitea(self, repo_url: str) -> bool:
host = urlparse(repo_url).netloc.lower()
return host and "github.com" not in host and "gitlab.com" not in host
async def _fetch_text(self, url: str) -> str | None:
session = async_get_clientsession(self.hass)
try:
async with session.get(url, timeout=20) as resp:
if resp.status != 200:
return None
return await resp.text()
except Exception:
return None
async def fetch_readme_markdown(self, repo_id: str) -> str | None:
"""
Try to fetch README.md from repository root (best-effort).
Supports GitHub and Gitea raw URLs.
"""
repo = self.get_repo(repo_id)
if not repo:
return None
owner, name = self._split_owner_repo(repo.url)
if not owner or not name:
return None
branch = repo.default_branch or "main"
filenames = ["README.md", "readme.md", "README.MD"]
candidates: list[str] = []
if self._is_github(repo.url):
base = f"https://raw.githubusercontent.com/{owner}/{name}/{branch}"
candidates.extend([f"{base}/{fn}" for fn in filenames])
elif self._is_gitea(repo.url):
u = urlparse(repo.url.rstrip("/"))
root = f"{u.scheme}://{u.netloc}/{owner}/{name}"
bases = [
f"{root}/raw/branch/{branch}",
f"{root}/raw/{branch}",
]
for b in bases:
candidates.extend([f"{b}/{fn}" for fn in filenames])
else:
return None
for url in candidates:
txt = await self._fetch_text(url)
if txt:
return txt
return None