diff --git a/custom_components/bahmcloud_store/providers.py b/custom_components/bahmcloud_store/providers.py
index 494f61d..e0561c6 100644
--- a/custom_components/bahmcloud_store/providers.py
+++ b/custom_components/bahmcloud_store/providers.py
@@ -10,9 +10,11 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
_LOGGER = logging.getLogger(__name__)
-
UA = "BahmcloudStore (Home Assistant)"
+_RE_GITHUB = re.compile(r"(?:^|/)github\.com/([^/]+)/([^/#?]+)", re.IGNORECASE)
+_RE_GITLAB = re.compile(r"(?:^|/)gitlab\.com/([^/]+)/([^/#?]+)", re.IGNORECASE)
+
@dataclass
class RepoInfo:
@@ -21,7 +23,6 @@ class RepoInfo:
description: str | None = None
provider: str | None = None
default_branch: str | None = None
-
latest_version: str | None = None
latest_version_source: str | None = None # "release" | "tag" | "atom" | None
@@ -37,251 +38,126 @@ def _normalize_repo_name(name: str | None) -> str | None:
def _split_owner_repo(repo_url: str) -> tuple[str | None, str | None]:
u = urlparse(repo_url.rstrip("/"))
- parts = [p for p in u.path.strip("/").split("/") if p]
+ parts = [p for p in u.path.split("/") if p]
if len(parts) < 2:
return None, None
- owner = parts[0].strip() or None
- repo = _normalize_repo_name(parts[1])
+ owner = parts[0]
+ repo = parts[1]
+ repo = repo[:-4] if repo.endswith(".git") else repo
return owner, repo
def detect_provider(repo_url: str) -> str:
- host = urlparse(repo_url).netloc.lower()
+ u = urlparse(repo_url.rstrip("/"))
+ host = (u.netloc or "").lower()
if "github.com" in host:
return "github"
- if "gitlab" in host:
+ if "gitlab.com" in host:
return "gitlab"
-
- owner, repo = _split_owner_repo(repo_url)
- if owner and repo:
- return "gitea"
-
- return "generic"
+ # gitea heuristic: /user/repo + typical endpoints handled elsewhere
+ return "gitea" if host else "other"
-async def _safe_json(session, url: str, *, headers: dict | None = None, timeout: int = 20):
+async def _safe_json(session, url: str, headers: dict) -> tuple[object | None, int]:
try:
- async with session.get(url, timeout=timeout, headers=headers) as resp:
- status = resp.status
+ async with session.get(url, headers=headers, timeout=20) as r:
+ status = r.status
if status != 200:
return None, status
- return await resp.json(), status
+ return await r.json(), status
except Exception:
- return None, None
+ return None, 0
-async def _safe_text(session, url: str, *, headers: dict | None = None, timeout: int = 20):
+async def _safe_text(session, url: str, headers: dict) -> tuple[str | None, int]:
try:
- async with session.get(url, timeout=timeout, headers=headers) as resp:
- status = resp.status
+ async with session.get(url, headers=headers, timeout=20) as r:
+ status = r.status
if status != 200:
return None, status
- return await resp.text(), status
+ return await r.text(), status
except Exception:
- return None, None
-
-
-def _extract_tag_from_github_url(url: str) -> str | None:
- m = re.search(r"/releases/tag/([^/?#]+)", url)
- if m:
- return m.group(1)
- m = re.search(r"/tag/([^/?#]+)", url)
- if m:
- return m.group(1)
- return None
+ return None, 0
def _strip_html(s: str) -> str:
- # minimal HTML entity cleanup for meta descriptions
- out = (
- s.replace("&", "&")
- .replace(""", '"')
- .replace("'", "'")
- .replace("<", "<")
- .replace(">", ">")
- )
- return re.sub(r"\s+", " ", out).strip()
-
-
-def _extract_meta(html: str, *, prop: str | None = None, name: str | None = None) -> str | None:
- # Extract
- # or
- if prop:
- # property="..." content="..."
- m = re.search(
- r']+property=["\']' + re.escape(prop) + r'["\'][^>]+content=["\']([^"\']+)["\']',
- html,
- flags=re.IGNORECASE,
- )
- if m:
- return _strip_html(m.group(1))
- m = re.search(
- r']+content=["\']([^"\']+)["\'][^>]+property=["\']' + re.escape(prop) + r'["\']',
- html,
- flags=re.IGNORECASE,
- )
- if m:
- return _strip_html(m.group(1))
-
- if name:
- m = re.search(
- r']+name=["\']' + re.escape(name) + r'["\'][^>]+content=["\']([^"\']+)["\']',
- html,
- flags=re.IGNORECASE,
- )
- if m:
- return _strip_html(m.group(1))
- m = re.search(
- r']+content=["\']([^"\']+)["\'][^>]+name=["\']' + re.escape(name) + r'["\']',
- html,
- flags=re.IGNORECASE,
- )
- if m:
- return _strip_html(m.group(1))
-
- return None
+ s = re.sub(r"<[^>]+>", " ", s or "")
+ s = re.sub(r"\s+", " ", s).strip()
+ return s
async def _github_description_html(hass: HomeAssistant, owner: str, repo: str) -> str | None:
- """
- GitHub API may be rate-limited; fetch public HTML and read meta description.
- """
session = async_get_clientsession(hass)
- headers = {
- "User-Agent": UA,
- "Accept": "text/html,application/xhtml+xml",
- }
-
- html, status = await _safe_text(session, f"https://github.com/{owner}/{repo}", headers=headers)
+ url = f"https://github.com/{owner}/{repo}"
+ html, status = await _safe_text(session, url, {"User-Agent": UA})
if not html or status != 200:
return None
- desc = _extract_meta(html, prop="og:description")
- if desc:
- return desc
+ # prefer og:description / description meta
+ m = re.search(r']+property="og:description"[^>]+content="([^"]+)"', html, re.IGNORECASE)
+ if m:
+ return _strip_html(m.group(1))
- desc = _extract_meta(html, name="description")
- if desc:
- return desc
+ m = re.search(r']+name="description"[^>]+content="([^"]+)"', html, re.IGNORECASE)
+ if m:
+ return _strip_html(m.group(1))
return None
-async def _github_latest_version_atom(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]:
- session = async_get_clientsession(hass)
- headers = {"User-Agent": UA, "Accept": "application/atom+xml,text/xml;q=0.9,*/*;q=0.8"}
-
- xml_text, _ = await _safe_text(session, f"https://github.com/{owner}/{repo}/releases.atom", headers=headers)
- if not xml_text:
- return None, None
-
- try:
- root = ET.fromstring(xml_text)
- except Exception:
- return None, None
-
- for entry in root.findall(".//{*}entry"):
- for link in entry.findall(".//{*}link"):
- href = link.attrib.get("href")
- if not href:
- continue
- tag = _extract_tag_from_github_url(href)
- if tag:
- return tag, "atom"
-
- return None, None
-
-
-async def _github_latest_version_redirect(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]:
- session = async_get_clientsession(hass)
- headers = {"User-Agent": UA}
- url = f"https://github.com/{owner}/{repo}/releases/latest"
- try:
- async with session.head(url, allow_redirects=False, timeout=15, headers=headers) as resp:
- if resp.status in (301, 302, 303, 307, 308):
- loc = resp.headers.get("Location")
- if loc:
- tag = _extract_tag_from_github_url(loc)
- if tag:
- return tag, "release"
- except Exception:
- pass
- return None, None
-
-
-async def _github_latest_version_api(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]:
- session = async_get_clientsession(hass)
- headers = {"Accept": "application/vnd.github+json", "User-Agent": UA}
-
- data, _ = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}/releases/latest", headers=headers)
- if isinstance(data, dict):
- tag = data.get("tag_name") or data.get("name")
- if isinstance(tag, str) and tag.strip():
- return tag.strip(), "release"
-
- data, _ = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=1", headers=headers)
- if isinstance(data, list) and data:
- tag = data[0].get("name")
- if isinstance(tag, str) and tag.strip():
- return tag.strip(), "tag"
-
- return None, None
-
-
async def _github_latest_version(hass: HomeAssistant, owner: str, repo: str) -> tuple[str | None, str | None]:
- tag, src = await _github_latest_version_atom(hass, owner, repo)
- if tag:
- return tag, src
+ session = async_get_clientsession(hass)
+ headers = {"User-Agent": UA, "Accept": "application/vnd.github+json"}
- tag, src = await _github_latest_version_redirect(hass, owner, repo)
- if tag:
- return tag, src
+ # 1) latest release
+ data, st = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}/releases/latest", headers)
+ if isinstance(data, dict) and data.get("tag_name"):
+ return str(data["tag_name"]), "release"
- return await _github_latest_version_api(hass, owner, repo)
+ # 2) tags (first)
+ data, st = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}/tags?per_page=1", headers)
+ if isinstance(data, list) and data:
+ t = data[0]
+ if isinstance(t, dict) and t.get("name"):
+ return str(t["name"]), "tag"
+
+ # 3) atom releases feed fallback (HTML blocked cases)
+ atom, st = await _safe_text(session, f"https://github.com/{owner}/{repo}/releases.atom", {"User-Agent": UA})
+ if atom and st == 200:
+ try:
+ root = ET.fromstring(atom)
+ ns = {"a": "http://www.w3.org/2005/Atom"}
+ entry = root.find("a:entry", ns)
+ if entry is not None:
+ title = entry.findtext("a:title", default="", namespaces=ns) or ""
+ title = title.strip()
+ # often: "v1.2.3" or "Release v1.2.3"
+ m = re.search(r"v?\d+\.\d+\.\d+([-\w\.]+)?", title)
+ if m:
+ return m.group(0), "atom"
+ except Exception:
+ pass
+
+ return None, None
async def _gitea_latest_version(hass: HomeAssistant, base: str, owner: str, repo: str) -> tuple[str | None, str | None]:
session = async_get_clientsession(hass)
+ headers = {"User-Agent": UA, "Accept": "application/json"}
- data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}/releases?limit=1")
+ # releases latest
+ data, st = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}/releases?limit=1", headers)
if isinstance(data, list) and data:
- tag = data[0].get("tag_name") or data[0].get("name")
- if isinstance(tag, str) and tag.strip():
- return tag.strip(), "release"
+ d0 = data[0]
+ if isinstance(d0, dict) and d0.get("tag_name"):
+ return str(d0["tag_name"]), "release"
- data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}/tags?limit=1")
+ # tags latest
+ data, st = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}/tags?limit=1", headers)
if isinstance(data, list) and data:
- tag = data[0].get("name")
- if isinstance(tag, str) and tag.strip():
- return tag.strip(), "tag"
-
- return None, None
-
-
-async def _gitlab_latest_version(hass: HomeAssistant, base: str, owner: str, repo: str) -> tuple[str | None, str | None]:
- session = async_get_clientsession(hass)
- headers = {"User-Agent": UA}
- project = quote_plus(f"{owner}/{repo}")
-
- data, _ = await _safe_json(
- session,
- f"{base}/api/v4/projects/{project}/releases?per_page=1&order_by=released_at&sort=desc",
- headers=headers,
- )
- if isinstance(data, list) and data:
- tag = data[0].get("tag_name") or data[0].get("name")
- if isinstance(tag, str) and tag.strip():
- return tag.strip(), "release"
-
- data, _ = await _safe_json(
- session,
- f"{base}/api/v4/projects/{project}/repository/tags?per_page=1&order_by=updated&sort=desc",
- headers=headers,
- )
- if isinstance(data, list) and data:
- tag = data[0].get("name")
- if isinstance(tag, str) and tag.strip():
- return tag.strip(), "tag"
+ d0 = data[0]
+ if isinstance(d0, dict) and d0.get("name"):
+ return str(d0["name"]), "tag"
return None, None
@@ -307,10 +183,8 @@ async def fetch_repo_info(hass: HomeAssistant, repo_url: str) -> RepoInfo:
try:
if provider == "github":
- # Try API repo details (may be rate-limited)
- headers = {"Accept": "application/vnd.github+json", "User-Agent": UA}
- data, status = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}", headers=headers)
-
+ headers = {"User-Agent": UA, "Accept": "application/vnd.github+json"}
+ data, st = await _safe_json(session, f"https://api.github.com/repos/{owner}/{repo}", headers)
if isinstance(data, dict):
info.description = data.get("description")
info.repo_name = _normalize_repo_name(data.get("name")) or repo
@@ -318,12 +192,8 @@ async def fetch_repo_info(hass: HomeAssistant, repo_url: str) -> RepoInfo:
if isinstance(data.get("owner"), dict) and data["owner"].get("login"):
info.owner = data["owner"]["login"]
else:
- # If API blocked, still set reasonable defaults
- if status == 403:
- _LOGGER.debug("GitHub API blocked/rate-limited for repo info %s/%s", owner, repo)
info.default_branch = "main"
- # If description missing, fetch from GitHub HTML
if not info.description:
desc = await _github_description_html(hass, owner, repo)
if desc:
@@ -337,10 +207,10 @@ async def fetch_repo_info(hass: HomeAssistant, repo_url: str) -> RepoInfo:
if provider == "gitlab":
u = urlparse(repo_url.rstrip("/"))
base = f"{u.scheme}://{u.netloc}"
- headers = {"User-Agent": UA}
+ headers = {"User-Agent": UA, "Accept": "application/json"}
project = quote_plus(f"{owner}/{repo}")
- data, _ = await _safe_json(session, f"{base}/api/v4/projects/{project}", headers=headers)
+ data, st = await _safe_json(session, f"{base}/api/v4/projects/{project}", headers)
if isinstance(data, dict):
info.description = data.get("description")
info.repo_name = _normalize_repo_name(data.get("path")) or repo
@@ -349,16 +219,22 @@ async def fetch_repo_info(hass: HomeAssistant, repo_url: str) -> RepoInfo:
if isinstance(ns, dict) and ns.get("path"):
info.owner = ns.get("path")
- ver, src = await _gitlab_latest_version(hass, base, owner, repo)
- info.latest_version = ver
- info.latest_version_source = src
+ # latest tag (gitlab)
+ tags, st = await _safe_json(session, f"{base}/api/v4/projects/{project}/repository/tags?per_page=1", headers)
+ if isinstance(tags, list) and tags:
+ t0 = tags[0]
+ if isinstance(t0, dict) and t0.get("name"):
+ info.latest_version = str(t0["name"])
+ info.latest_version_source = "tag"
+
return info
if provider == "gitea":
u = urlparse(repo_url.rstrip("/"))
base = f"{u.scheme}://{u.netloc}"
+ headers = {"User-Agent": UA, "Accept": "application/json"}
- data, _ = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}")
+ data, st = await _safe_json(session, f"{base}/api/v1/repos/{owner}/{repo}", headers)
if isinstance(data, dict):
info.description = data.get("description")
info.repo_name = _normalize_repo_name(data.get("name")) or repo
@@ -375,4 +251,49 @@ async def fetch_repo_info(hass: HomeAssistant, repo_url: str) -> RepoInfo:
except Exception as e:
_LOGGER.debug("Provider fetch failed for %s: %s", repo_url, e)
- return info
\ No newline at end of file
+ return info
+
+
+async def fetch_readme(hass: HomeAssistant, repo_url: str, default_branch: str | None) -> object | None:
+ owner, repo = _split_owner_repo(repo_url)
+ if not owner or not repo:
+ return None
+
+ provider = detect_provider(repo_url)
+ branch = default_branch or "main"
+ session = async_get_clientsession(hass)
+
+ # GitHub: raw
+ if provider == "github":
+ for fn in ("README.md", "Readme.md", "readme.md"):
+ url = f"https://raw.githubusercontent.com/{owner}/{repo}/{branch}/{fn}"
+ txt, st = await _safe_text(session, url, {"User-Agent": UA})
+ if txt and st == 200:
+ return txt
+ return None
+
+ # GitLab raw
+ if provider == "gitlab":
+ u = urlparse(repo_url.rstrip("/"))
+ root = f"{u.scheme}://{u.netloc}/{owner}/{repo}"
+ for fn in ("README.md", "Readme.md", "readme.md"):
+ url = f"{root}/-/raw/{branch}/{fn}"
+ txt, st = await _safe_text(session, url, {"User-Agent": UA})
+ if txt and st == 200:
+ return txt
+ return None
+
+ # Gitea raw (supports both raw formats)
+ if provider == "gitea":
+ u = urlparse(repo_url.rstrip("/"))
+ root = f"{u.scheme}://{u.netloc}/{owner}/{repo}"
+ bases = [f"{root}/raw/branch/{branch}", f"{root}/raw/{branch}"]
+ for fn in ("README.md", "Readme.md", "readme.md"):
+ for b in bases:
+ url = f"{b}/{fn}"
+ txt, st = await _safe_text(session, url, {"User-Agent": UA})
+ if txt and st == 200:
+ return txt
+ return None
+
+ return None