3 Commits

View File

@@ -198,10 +198,14 @@ class BCSCore:
raise BCSError(f"Invalid store.yaml: {e}") from e
async def add_custom_repo(self, url: str, name: str | None) -> CustomRepo:
repo = await self.storage.add_custom_repo(url=url, name=name)
url = str(url or "").strip()
if not url:
raise BCSError("Missing url")
c = await self.storage.add_custom_repo(url, name)
await self.refresh()
self.signal_updated()
return repo
return c
async def remove_custom_repo(self, repo_id: str) -> None:
await self.storage.remove_custom_repo(repo_id)
@@ -214,7 +218,6 @@ class BCSCore:
def list_repos_public(self) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for r in self.repos.values():
resolved_description = r.meta_description or r.provider_description
out.append(
{
"id": r.id,
@@ -223,46 +226,30 @@ class BCSCore:
"source": r.source,
"owner": r.owner,
"provider": r.provider,
"meta_source": r.meta_source,
"meta_name": r.meta_name,
"meta_description": r.meta_description,
"meta_category": r.meta_category,
"meta_author": r.meta_author,
"meta_maintainer": r.meta_maintainer,
"provider_repo_name": r.provider_repo_name,
"provider_description": r.provider_description,
"description": resolved_description,
"category": r.meta_category,
"repo_name": r.provider_repo_name,
"description": r.provider_description or r.meta_description,
"default_branch": r.default_branch,
"latest_version": r.latest_version,
"latest_version_source": r.latest_version_source,
"category": r.meta_category,
"meta_author": r.meta_author,
"meta_maintainer": r.meta_maintainer,
"meta_source": r.meta_source,
}
)
return out
# ----------------------------
# README fetching
# ----------------------------
def _normalize_repo_name(self, name: str | None) -> str | None:
if not name:
return None
n = name.strip()
if n.endswith(".git"):
n = n[:-4]
return n or None
def _split_owner_repo(self, repo_url: str) -> tuple[str | None, str | None]:
u = urlparse(repo_url.rstrip("/"))
parts = [p for p in u.path.strip("/").split("/") if p]
if len(parts) < 2:
return None, None
owner = parts[0].strip() or None
repo = self._normalize_repo_name(parts[1])
return owner, repo
name = parts[1].strip()
if name.endswith(".git"):
name = name[:-4]
name = name.strip() or None
return owner, name
def _is_github(self, repo_url: str) -> bool:
return "github.com" in urlparse(repo_url).netloc.lower()
@@ -282,42 +269,93 @@ class BCSCore:
return None
async def fetch_readme_markdown(self, repo_id: str) -> str | None:
"""Fetch README markdown from GitHub, Gitea or GitLab.
Defensive behavior:
- tries multiple common filenames
- tries multiple branches (default, main, master)
- uses public raw endpoints (no tokens required for public repositories)
"""
repo = self.get_repo(repo_id)
if not repo:
return None
owner, name = self._split_owner_repo(repo.url)
if not owner or not name:
repo_url = (repo.url or "").strip()
if not repo_url:
return None
branch = repo.default_branch or "main"
filenames = ["README.md", "readme.md", "README.MD"]
# Branch fallbacks
branch_candidates: list[str] = []
if repo.default_branch and str(repo.default_branch).strip():
branch_candidates.append(str(repo.default_branch).strip())
for b in ("main", "master"):
if b not in branch_candidates:
branch_candidates.append(b)
# Filename fallbacks
filenames = ["README.md", "readme.md", "README.MD", "README.rst", "README"]
provider = (repo.provider or "").strip().lower()
if not provider:
provider = detect_provider(repo_url) or ""
u = urlparse(repo_url.rstrip("/"))
host = (u.netloc or "").lower()
candidates: list[str] = []
if self._is_github(repo.url):
# raw github content
base = f"https://raw.githubusercontent.com/{owner}/{name}/{branch}"
candidates.extend([f"{base}/{fn}" for fn in filenames])
if self._is_github(repo_url):
owner, name = self._split_owner_repo(repo_url)
if not owner or not name:
return None
for branch in branch_candidates:
base = f"https://raw.githubusercontent.com/{owner}/{name}/{branch}"
candidates.extend([f"{base}/{fn}" for fn in filenames])
elif provider == "gitlab" or "gitlab" in host:
# GitLab can have nested groups: /group/subgroup/repo
parts = [p for p in u.path.strip("/").split("/") if p]
if len(parts) < 2:
return None
repo_name = parts[-1].strip()
if repo_name.endswith(".git"):
repo_name = repo_name[:-4]
group_path = "/".join(parts[:-1]).strip("/")
if not group_path or not repo_name:
return None
root = f"{u.scheme}://{u.netloc}/{group_path}/{repo_name}"
for branch in branch_candidates:
bases = [
f"{root}/-/raw/{branch}",
# Some instances may expose /raw/<branch> as well
f"{root}/raw/{branch}",
]
for b in bases:
candidates.extend([f"{b}/{fn}" for fn in filenames])
elif self._is_gitea(repo_url):
owner, name = self._split_owner_repo(repo_url)
if not owner or not name:
return None
elif self._is_gitea(repo.url):
u = urlparse(repo.url.rstrip("/"))
root = f"{u.scheme}://{u.netloc}/{owner}/{name}"
# gitea raw endpoints (both common forms)
bases = [
f"{root}/raw/branch/{branch}",
f"{root}/raw/{branch}",
]
for b in bases:
candidates.extend([f"{b}/{fn}" for fn in filenames])
for branch in branch_candidates:
bases = [
f"{root}/raw/branch/{branch}",
f"{root}/raw/{branch}",
]
for b in bases:
candidates.extend([f"{b}/{fn}" for fn in filenames])
else:
return None
for url in candidates:
txt = await self._fetch_text(url)
if txt:
if txt and txt.strip():
return txt
return None