Files
bahmcloud_store/custom_components/bahmcloud_store/providers.py
2026-01-20 07:17:24 +00:00

681 lines
22 KiB
Python

from __future__ import annotations
import logging
import re
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from urllib.parse import quote_plus, urlparse
from packaging.version import InvalidVersion, Version
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
_LOGGER = logging.getLogger(__name__)
UA = "BahmcloudStore (Home Assistant)"
@dataclass
class RepoInfo:
owner: str | None = None
repo_name: str | None = None
description: str | None = None
provider: str | None = None
default_branch: str | None = None
latest_version: str | None = None
latest_version_source: str | None = None # "release" | "tag" | "atom" | None
def _normalize_repo_name(name: str | None) -> str | None:
if not name:
return None
n = name.strip()
if n.endswith(".git"):
n = n[:-4]
return n or None
def _split_owner_repo(repo_url: str) -> tuple[str | None, str | None]:
u = urlparse(repo_url.rstrip("/"))
parts = [p for p in u.path.strip("/").split("/") if p]
if len(parts) < 2:
return None, None
owner = parts[0].strip() or None
repo = _normalize_repo_name(parts[1])
return owner, repo
def detect_provider(repo_url: str) -> str:
host = urlparse(repo_url).netloc.lower()
if "github.com" in host:
return "github"
if "gitlab" in host:
return "gitlab"
return "gitea"
async def _safe_json(session, url: str, *, headers: dict | None = None, timeout: int = 20):
try:
async with session.get(url, timeout=timeout, headers=headers) as resp:
status = resp.status
if status != 200:
return None, status
return await resp.json(), status
except Exception:
return None, None
async def _safe_text(session, url: str, *, headers: dict | None = None, timeout: int = 20):
try:
async with session.get(url, timeout=timeout, headers=headers) as resp:
status = resp.status
if status != 200:
return None, status
return await resp.text(), status
except Exception:
return None, None
def _extract_tag_from_github_url(url: str) -> str | None:
m = re.search(r"/releases/tag/([^/?#]+)", url or "")
if not m:
return None
return m.group(1).strip() or None
def _extract_meta(html: str, *, prop: str | None = None, name: str | None = None) -> str | None:
if not html:
return None
if prop:
m = re.search(rf'<meta\s+property="{re.escape(prop)}"\s+content="([^"]+)"', html)
if m:
return m.group(1).strip()
if name:
m = re.search(rf'<meta\s+name="{re.escape(name)}"\s+content="([^"]+)"', html)
if m:
return m.group(1).strip()
return None
def _semver_key(tag: str) -> Version | None:
t = (tag or "").strip()
if not t:
return None
if t.startswith(("v", "V")):
t = t[1:]
try:
return Version(t)
except InvalidVersion:
return None
def _pick_highest_semver(tags: list[str]) -> str | None:
parsed: list[tuple[Version, str]] = []
for t in tags:
if not isinstance(t, str):
continue
ts = t.strip()
if not ts:
continue
v = _semver_key(ts)
if v is not None:
parsed.append((v, ts))
if not parsed:
return None
parsed.sort(key=lambda x: x[0], reverse=True)
return parsed[0][1]
async def _github_description_html(hass: HomeAssistant, owner: str, repo: str) -> str | None:
session = async_get_clientsession(hass)
url = f"https://github.com/{owner}/{repo}"
html, status = await _safe_text(session, url, headers={"User-Agent": UA})
if status != 200 or not html:
return None
desc = _extract_meta(html, prop="og:description")
if desc:
return desc
return _extract_meta(html, name="description")
async def _github_latest_version_atom(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]:
session = async_get_clientsession(hass)
url = f"https://github.com/{owner}/{repo}/releases.atom"
atom, status = await _safe_text(session, url, headers={"User-Agent": UA})
if status != 200 or not atom:
return None, None
try:
root = ET.fromstring(atom)
ns = {"a": "http://www.w3.org/2005/Atom"}
entry = root.find("a:entry", ns)
if entry is None:
return None, None
link = entry.find("a:link", ns)
if link is not None and link.attrib.get("href"):
tag = _extract_tag_from_github_url(link.attrib["href"])
if tag:
return tag, "atom"
title = entry.find("a:title", ns)
if title is not None and title.text:
t = title.text.strip()
if t:
return t, "atom"
except Exception:
return None, None
return None, None
async def _github_latest_version_redirect(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]:
session = async_get_clientsession(hass)
url = f"https://github.com/{owner}/{repo}/releases/latest"
try:
async with session.get(url, timeout=20, headers={"User-Agent": UA}, allow_redirects=True) as resp:
if resp.status != 200:
return None, None
final = str(resp.url)
tag = _extract_tag_from_github_url(final)
if tag:
return tag, "release"
except Exception:
return None, None
return None, None
async def _github_latest_version_api(
hass: HomeAssistant, owner: str, repo: str, *, github_token: str | None = None
) -> tuple[str | None, str | None]:
session = async_get_clientsession(hass)
headers = _github_headers(github_token)
data, status = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}/releases/latest", headers=headers)
if isinstance(data, dict) and data.get("tag_name"):
return str(data["tag_name"]), "release"
# No releases -> pick highest semver from many tags (instead of per_page=1)
if status == 404:
data, _ = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=100", headers=headers)
tags: list[str] = []
if isinstance(data, list):
for t in data:
if isinstance(t, dict) and t.get("name"):
tags.append(str(t["name"]))
best = _pick_highest_semver(tags)
if best:
return best, "tag"
# fallback: keep old behavior (first tag)
if tags:
return tags[0], "tag"
return None, None
async def _github_latest_version(
hass: HomeAssistant, owner: str, repo: str, *, github_token: str | None = None
) -> tuple[str | None, str | None]:
tag, src = await _github_latest_version_redirect(hass, owner, repo)
if tag:
return tag, src
tag, src = await _github_latest_version_api(hass, owner, repo, github_token=github_token)
if tag:
return tag, src
return await _github_latest_version_atom(hass, owner, repo)
async def _gitea_latest_version(hass: HomeAssistant, base: str, owner: str, repo: str) -> tuple[str | None, str | None]:
session = async_get_clientsession(hass)
# releases: fetch multiple, pick highest semver (instead of limit=1)
data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}/releases?limit=50")
rel_tags: list[str] = []
if isinstance(data, list):
for r in data:
if isinstance(r, dict) and r.get("tag_name"):
rel_tags.append(str(r["tag_name"]))
best_rel = _pick_highest_semver(rel_tags)
if best_rel:
return best_rel, "release"
if rel_tags:
return rel_tags[0], "release"
# tags: fetch multiple, pick highest semver (instead of limit=1)
data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}/tags?limit=50")
tags: list[str] = []
if isinstance(data, list):
for t in data:
if isinstance(t, dict) and t.get("name"):
tags.append(str(t["name"]))
best = _pick_highest_semver(tags)
if best:
return best, "tag"
if tags:
return tags[0], "tag"
return None, None
async def _gitlab_latest_version(
hass: HomeAssistant, base: str, owner: str, repo: str
) -> tuple[str | None, str | None]:
session = async_get_clientsession(hass)
headers = {"User-Agent": UA}
project = quote_plus(f"{owner}/{repo}")
# releases: fetch multiple, pick highest semver (instead of per_page=1)
data, _ = await _safe_json(session, f"{base}/api/v4/projects/{project}/releases?per_page=50", headers=headers)
rel_tags: list[str] = []
if isinstance(data, list):
for r in data:
if isinstance(r, dict) and r.get("tag_name"):
rel_tags.append(str(r["tag_name"]))
best_rel = _pick_highest_semver(rel_tags)
if best_rel:
return best_rel, "release"
if rel_tags:
return rel_tags[0], "release"
# tags: fetch multiple, pick highest semver (instead of per_page=1)
data, _ = await _safe_json(session, f"{base}/api/v4/projects/{project}/repository/tags?per_page=50", headers=headers)
tags: list[str] = []
if isinstance(data, list):
for t in data:
if isinstance(t, dict) and t.get("name"):
tags.append(str(t["name"]))
best = _pick_highest_semver(tags)
if best:
return best, "tag"
if tags:
return tags[0], "tag"
# atom fallback
atom, status = await _safe_text(session, f"{base}/{owner}/{repo}/-/tags?format=atom", headers=headers)
if status == 200 and atom:
try:
root = ET.fromstring(atom)
ns = {"a": "http://www.w3.org/2005/Atom"}
entry = root.find("a:entry", ns)
if entry is not None:
title = entry.find("a:title", ns)
if title is not None and title.text:
return title.text.strip(), "atom"
except Exception:
pass
return None, None
def _github_headers(github_token: str | None = None) -> dict:
headers = {"Accept": "application/vnd.github+json", "User-Agent": UA}
token = (github_token or "").strip()
if token:
# PAT or fine-grained token
headers["Authorization"] = f"Bearer {token}"
return headers
async def fetch_repo_info(hass: HomeAssistant, repo_url: str, *, github_token: str | None = None) -> RepoInfo:
provider = detect_provider(repo_url)
owner, repo = _split_owner_repo(repo_url)
info = RepoInfo(
owner=owner,
repo_name=repo,
description=None,
provider=provider,
default_branch=None,
latest_version=None,
latest_version_source=None,
)
if not owner or not repo:
return info
session = async_get_clientsession(hass)
try:
if provider == "github":
headers = _github_headers(github_token)
data, status = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}", headers=headers)
if isinstance(data, dict):
info.description = data.get("description")
info.repo_name = _normalize_repo_name(data.get("name")) or repo
info.default_branch = data.get("default_branch") or "main"
if isinstance(data.get("owner"), dict) and data["owner"].get("login"):
info.owner = data["owner"]["login"]
else:
if status == 403:
_LOGGER.debug("GitHub API blocked/rate-limited for repo info %s/%s", owner, repo)
info.default_branch = "main"
if not info.description:
desc = await _github_description_html(hass, owner, repo)
if desc:
info.description = desc
ver, src = await _github_latest_version(hass, owner, repo, github_token=github_token)
info.latest_version = ver
info.latest_version_source = src
return info
if provider == "gitlab":
u = urlparse(repo_url.rstrip("/"))
base = f"{u.scheme}://{u.netloc}"
headers = {"User-Agent": UA}
project = quote_plus(f"{owner}/{repo}")
data, _ = await _safe_json(session, f"{base}/api/v4/projects/{project}", headers=headers)
if isinstance(data, dict):
info.description = data.get("description")
info.repo_name = _normalize_repo_name(data.get("path")) or repo
info.default_branch = data.get("default_branch") or "main"
ns = data.get("namespace")
if isinstance(ns, dict) and ns.get("path"):
info.owner = ns.get("path")
ver, src = await _gitlab_latest_version(hass, base, owner, repo)
info.latest_version = ver
info.latest_version_source = src
return info
if provider == "gitea":
u = urlparse(repo_url.rstrip("/"))
base = f"{u.scheme}://{u.netloc}"
data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}")
if isinstance(data, dict):
info.description = data.get("description")
info.repo_name = _normalize_repo_name(data.get("name")) or repo
info.default_branch = data.get("default_branch") or "main"
if isinstance(data.get("owner"), dict) and data["owner"].get("login"):
info.owner = data["owner"]["login"]
ver, src = await _gitea_latest_version(hass, base, owner, repo)
info.latest_version = ver
info.latest_version_source = src
return info
except Exception as e:
_LOGGER.debug("fetch_repo_info failed for %s: %s", repo_url, e)
return info
async def fetch_readme_markdown(
hass: HomeAssistant,
repo_url: str,
*,
provider: str | None = None,
default_branch: str | None = None,
) -> str | None:
"""Fetch README Markdown for public repositories (GitHub/GitLab/Gitea).
Defensive behavior:
- tries multiple common README filenames
- tries multiple branches (default, main, master)
- uses public raw endpoints (no tokens required for public repositories)
"""
repo_url = (repo_url or "").strip()
if not repo_url:
return None
prov = (provider or "").strip().lower() if provider else ""
if not prov:
prov = detect_provider(repo_url)
branch_candidates: list[str] = []
if default_branch and str(default_branch).strip():
branch_candidates.append(str(default_branch).strip())
for b in ("main", "master"):
if b not in branch_candidates:
branch_candidates.append(b)
filenames = ["README.md", "readme.md", "README.MD", "README.rst", "README"]
session = async_get_clientsession(hass)
headers = {"User-Agent": UA}
def _normalize_gitlab_path(path: str) -> str | None:
p = (path or "").strip().strip("/")
if not p:
return None
parts = [x for x in p.split("/") if x]
if len(parts) < 2:
return None
if parts[-1].endswith(".git"):
parts[-1] = parts[-1][:-4]
return "/".join(parts)
candidates: list[str] = []
if prov == "github":
owner, repo = _split_owner_repo(repo_url)
if not owner or not repo:
return None
for branch in branch_candidates:
base = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}"
for fn in filenames:
candidates.append(f"{base}/{fn}")
elif prov == "gitea":
owner, repo = _split_owner_repo(repo_url)
if not owner or not repo:
return None
u = urlparse(repo_url.rstrip("/"))
root = f"{u.scheme}://{u.netloc}/{owner}/{repo}"
for branch in branch_candidates:
bases = [
f"{root}/raw/branch/{branch}",
f"{root}/raw/{branch}",
]
for b in bases:
for fn in filenames:
candidates.append(f"{b}/{fn}")
elif prov == "gitlab":
u = urlparse(repo_url.rstrip("/"))
path_repo = _normalize_gitlab_path(u.path)
if not path_repo:
return None
root = f"{u.scheme}://{u.netloc}/{path_repo}"
for branch in branch_candidates:
bases = [
f"{root}/-/raw/{branch}",
f"{root}/raw/{branch}",
]
for b in bases:
for fn in filenames:
candidates.append(f"{b}/{fn}")
else:
return None
for url in candidates:
try:
async with session.get(url, timeout=20, headers=headers) as resp:
if resp.status != 200:
continue
txt = await resp.text()
if txt and txt.strip():
return txt
except Exception:
continue
return None
async def fetch_repo_versions(
hass: HomeAssistant,
repo_url: str,
*,
provider: str | None = None,
default_branch: str | None = None,
limit: int = 20,
github_token: str | None = None,
) -> list[dict[str, str]]:
"""List available versions/refs for a repository.
Returns a list of dicts with keys:
- ref: the ref to install (tag/release/branch)
- label: human-friendly label
- source: release|tag|branch
Notes:
- Uses provider APIs; for GitHub we include the configured token (if any) to avoid unauthenticated rate limits.
- We prefer releases first (if available), then tags.
"""
repo_url = (repo_url or "").strip()
if not repo_url:
return []
prov = (provider or "").strip().lower() if provider else ""
if not prov:
prov = detect_provider(repo_url)
owner, repo = _split_owner_repo(repo_url)
if not owner or not repo:
return []
session = async_get_clientsession(hass)
headers = {"User-Agent": UA}
if prov == "github":
headers = _github_headers(github_token)
out: list[dict[str, str]] = []
seen: set[str] = set()
def _add(ref: str | None, label: str, source: str) -> None:
r = (ref or "").strip()
if not r or r in seen:
return
seen.add(r)
out.append({"ref": r, "label": label, "source": source})
# Always offer default branch as an explicit option.
if default_branch and str(default_branch).strip():
b = str(default_branch).strip()
_add(b, f"Branch: {b}", "branch")
try:
if prov == "github":
# Releases (prefer these over tags)
# Use the configured GitHub token (if any) to avoid unauthenticated rate limits.
gh_headers = _github_headers(github_token)
per_page = max(1, min(int(limit), 100))
data, _ = await _safe_json(
session,
f"https://api.github.com/repos/{owner}/{repo}/releases?per_page={per_page}",
headers=gh_headers,
)
if isinstance(data, list):
for r in data:
if not isinstance(r, dict):
continue
tag = r.get("tag_name")
name = r.get("name")
if tag:
lbl = str(tag)
if isinstance(name, str) and name.strip() and name.strip() != str(tag):
lbl = f"{tag}{name.strip()}"
_add(str(tag), lbl, "release")
# Tags
data, _ = await _safe_json(
session,
f"https://api.github.com/repos/{owner}/{repo}/tags?per_page={per_page}",
headers=gh_headers,
)
if isinstance(data, list):
for t in data:
if isinstance(t, dict) and t.get("name"):
_add(str(t["name"]), str(t["name"]), "tag")
return out
if prov == "gitlab":
u = urlparse(repo_url.rstrip("/"))
base = f"{u.scheme}://{u.netloc}"
project = quote_plus(f"{owner}/{repo}")
data, _ = await _safe_json(
session,
f"{base}/api/v4/projects/{project}/releases?per_page={int(limit)}",
headers=headers,
)
if isinstance(data, list):
for r in data:
if not isinstance(r, dict):
continue
tag = r.get("tag_name")
name = r.get("name")
if tag:
lbl = str(tag)
if isinstance(name, str) and name.strip() and name.strip() != str(tag):
lbl = f"{tag}{name.strip()}"
_add(str(tag), lbl, "release")
data, _ = await _safe_json(
session,
f"{base}/api/v4/projects/{project}/repository/tags?per_page={int(limit)}",
headers=headers,
)
if isinstance(data, list):
for t in data:
if isinstance(t, dict) and t.get("name"):
_add(str(t["name"]), str(t["name"]), "tag")
return out
# gitea (incl. Bahmcloud)
u = urlparse(repo_url.rstrip("/"))
base = f"{u.scheme}://{u.netloc}"
data, _ = await _safe_json(
session,
f"{base}/api/v1/repos/{owner}/{repo}/releases?limit={int(limit)}",
headers=headers,
)
if isinstance(data, list):
for r in data:
if not isinstance(r, dict):
continue
tag = r.get("tag_name")
name = r.get("name")
if tag:
lbl = str(tag)
if isinstance(name, str) and name.strip() and name.strip() != str(tag):
lbl = f"{tag}{name.strip()}"
_add(str(tag), lbl, "release")
data, _ = await _safe_json(
session,
f"{base}/api/v1/repos/{owner}/{repo}/tags?limit={int(limit)}",
headers=headers,
)
if isinstance(data, list):
for t in data:
if isinstance(t, dict) and t.get("name"):
_add(str(t["name"]), str(t["name"]), "tag")
return out
except Exception:
_LOGGER.debug("fetch_repo_versions failed for %s", repo_url, exc_info=True)
return out