From 166c2f375e2d8ba1a701e727bc5482793154fadf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ren=C3=A9=20Bachmann?= Date: Wed, 14 Jan 2026 09:02:58 +0100 Subject: [PATCH] Add diagnostics for Proxmox config entries Implement diagnostics for Proxmox configuration entries with public-safe sanitization for sensitive data. --- custom_components/proxmox_pve/diagnostics.py | 340 +++++++++++++++++++ 1 file changed, 340 insertions(+) create mode 100644 custom_components/proxmox_pve/diagnostics.py diff --git a/custom_components/proxmox_pve/diagnostics.py b/custom_components/proxmox_pve/diagnostics.py new file mode 100644 index 0000000..9acc6cd --- /dev/null +++ b/custom_components/proxmox_pve/diagnostics.py @@ -0,0 +1,340 @@ +from __future__ import annotations + +import re +from typing import Any, Awaitable, Callable + +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant + +from .const import DOMAIN + + +# --------------------------- +# Masking helpers +# --------------------------- + +_IPV4_RE = re.compile(r"\b(?:(?:25[0-5]|2[0-4]\d|1?\d?\d)\.){3}(?:25[0-5]|2[0-4]\d|1?\d?\d)\b") + + +def _mask_ipv4(ip: str) -> str: + """Mask IPv4 addresses (keep first two octets). Example: 192.168.178.101 -> 192.168.xxx.xxx""" + parts = ip.split(".") + if len(parts) != 4: + return ip + return f"{parts[0]}.{parts[1]}.xxx.xxx" + + +def _mask_ipv4_in_text(text: str) -> str: + """Replace any IPv4 occurrences inside a string.""" + return _IPV4_RE.sub(lambda m: _mask_ipv4(m.group(0)), text) + + +def _mask_token_name(value: Any) -> Any: + """Show only first 2 and last 2 chars of token_name.""" + if not isinstance(value, str): + return value + s = value.strip() + if len(s) <= 4: + return "*" * len(s) if s else s + return f"{s[:2]}***{s[-2:]}" + + +def _redact_secret(value: Any) -> Any: + """Redact secrets while keeping structure intact.""" + if value is None: + return None + if isinstance(value, str): + s = value.strip() + if not s: + return value + if len(s) <= 6: + return "***" + return s[:3] + "***" + s[-3:] + if isinstance(value, dict): + return {k: _redact_secret(v) for k, v in value.items()} + if isinstance(value, list): + return [_redact_secret(v) for v in value] + return value + + +def _sanitize_public(value: Any) -> Any: + """ + Public-safe sanitization: + - Mask all IPv4 strings anywhere + - Keep structure + """ + if value is None: + return None + if isinstance(value, str): + return _mask_ipv4_in_text(value) + if isinstance(value, dict): + return {str(k): _sanitize_public(v) for k, v in value.items()} + if isinstance(value, list): + return [_sanitize_public(v) for v in value] + if isinstance(value, tuple): + return [_sanitize_public(v) for v in value] + return value + + +# --------------------------- +# Coordinator helpers +# --------------------------- + +def _safe_coordinator_state(coord: Any) -> dict[str, Any]: + if not coord: + return {} + data = getattr(coord, "data", None) + preview = None + if isinstance(data, list): + preview = data[:3] + return { + "name": getattr(coord, "name", None), + "update_interval": str(getattr(coord, "update_interval", None)), + "last_update_success": getattr(coord, "last_update_success", None), + "last_exception": repr(getattr(coord, "last_exception", None)), + "data_type": type(data).__name__, + "data_preview": preview, + } + + +def _stringify_key(key: Any) -> str: + if isinstance(key, tuple): + return ":".join(str(x) for x in key) + return str(key) + + +def _safe_coord_map(coord_map: Any) -> dict[str, Any]: + if not isinstance(coord_map, dict): + return {} + return {_stringify_key(k): _safe_coordinator_state(v) for k, v in coord_map.items()} + + +# --------------------------- +# Proxmox client access +# --------------------------- + +async def _try_call(func: Callable[..., Awaitable[Any]], *args: Any, **kwargs: Any) -> Any: + return await func(*args, **kwargs) + + +async def _client_get_json(client: Any, path: str) -> Any: + """ + Best-effort JSON GET against the Proxmox client. + Tries multiple method names to stay compatible with different client styles. + """ + if client is None: + return None + + if not path.startswith("/"): + path = "/" + path + + candidates = [] + for name in ( + "get", + "api_get", + "request", + "api_request", + "_request", + "_api_request", + "get_json", + "async_get", + ): + fn = getattr(client, name, None) + if callable(fn): + candidates.append((name, fn)) + + last_err: Exception | None = None + + for name, fn in candidates: + try: + if name in ("request", "api_request", "_request", "_api_request"): + data = await _try_call(fn, "GET", path) + else: + data = await _try_call(fn, path) + + # Unwrap {"data": ...} responses + if isinstance(data, dict) and "data" in data and len(data) == 1: + return data["data"] + + return data + except Exception as err: # noqa: BLE001 + last_err = err + continue + + if last_err: + return { + "error": f"Could not query {path} via client " + f"({type(last_err).__name__}: {last_err})" + } + + return {"error": f"Could not query {path} via client (no compatible method found)"} + + +# --------------------------- +# Proxmox data processing +# --------------------------- + +def _count_resources(resources_data: Any) -> dict[str, int]: + counts = { + "nodes": 0, + "vms": 0, + "containers": 0, + "total_guests": 0, + } + if not isinstance(resources_data, list): + return counts + + for r in resources_data: + if not isinstance(r, dict): + continue + r_type = r.get("type") + if r_type == "node": + counts["nodes"] += 1 + elif r_type == "qemu": + counts["vms"] += 1 + counts["total_guests"] += 1 + elif r_type == "lxc": + counts["containers"] += 1 + counts["total_guests"] += 1 + + return counts + + +def _extract_cluster_info(cluster_status: Any) -> dict[str, Any] | None: + if not isinstance(cluster_status, list): + return None + + cluster_name = None + node_count = 0 + + for item in cluster_status: + if not isinstance(item, dict): + continue + if item.get("type") == "cluster": + cluster_name = item.get("name") or cluster_name + if item.get("type") == "node": + node_count += 1 + + return { + "name": cluster_name, + "nodes": node_count, + "raw_preview": cluster_status[:5], + } + + +# --------------------------- +# Main diagnostics entrypoint +# --------------------------- + +async def async_get_config_entry_diagnostics( + hass: HomeAssistant, entry: ConfigEntry +) -> dict[str, Any]: + """Return diagnostics for a config entry (public-safe, JSON serializable).""" + domain_data = hass.data.get(DOMAIN, {}).get(entry.entry_id, {}) or {} + + client = domain_data.get("client") + resources = domain_data.get("resources") + nodes = domain_data.get("nodes") + guest_coordinators = domain_data.get("guest_coordinators", {}) or {} + node_coordinators = domain_data.get("node_coordinators", {}) or {} + + # ---- Entry data/options with focused redaction ---- + entry_data = dict(entry.data) + # Mask secrets + for key in ("token_value", "password", "api_key", "secret"): + if key in entry_data: + entry_data[key] = _redact_secret(entry_data[key]) + # Mask token_name per requirement + if "token_name" in entry_data: + entry_data["token_name"] = _mask_token_name(entry_data["token_name"]) + # Host/IP should be masked + if "host" in entry_data and isinstance(entry_data["host"], str): + entry_data["host"] = _mask_ipv4_in_text(entry_data["host"]) + + options = dict(entry.options) + # ip_prefix may reveal network; mask if it looks like an IP-ish prefix + if "ip_prefix" in options and isinstance(options["ip_prefix"], str): + options["ip_prefix"] = _mask_ipv4_in_text(options["ip_prefix"]) + + # ---- Resource previews & counts ---- + res_preview = None + res_counts = {"nodes": 0, "vms": 0, "containers": 0, "total_guests": 0} + try: + if resources and isinstance(resources.data, list): + res_counts = _count_resources(resources.data) + res_preview = [ + { + "type": r.get("type"), + "node": r.get("node"), + "vmid": r.get("vmid"), + "name": r.get("name"), + "status": r.get("status"), + } + for r in resources.data[:25] + if isinstance(r, dict) + ] + except Exception: # noqa: BLE001 + res_preview = None + + node_preview = None + try: + if nodes and isinstance(nodes.data, list): + node_preview = [ + { + "node": n.get("node"), + "status": n.get("status"), + "uptime": n.get("uptime"), + "cpu": n.get("cpu"), + "mem": n.get("mem"), + "maxmem": n.get("maxmem"), + } + for n in nodes.data[:15] + if isinstance(n, dict) + ] + except Exception: # noqa: BLE001 + node_preview = None + + # ---- Proxmox meta information ---- + version_info = await _client_get_json(client, "/version") + cluster_status = await _client_get_json(client, "/cluster/status") + cluster_info = _extract_cluster_info(cluster_status) + + proxmox_meta = { + "version": version_info, + "cluster": cluster_info, + "counts": res_counts, + } + + result = { + "entry": { + "entry_id": entry.entry_id, + # Title can contain IPs (like "Proxmox 192.168.x.x") -> mask it + "title": _mask_ipv4_in_text(entry.title), + "data": entry_data, + "options": options, + }, + "runtime": { + "client": { + "host": _mask_ipv4_in_text(str(getattr(client, "host", ""))) if client else None, + "port": getattr(client, "port", None) if client else None, + "verify_ssl": getattr(client, "verify_ssl", None) if client else None, + }, + "scan_interval": domain_data.get("scan_interval"), + "ip_mode": domain_data.get("ip_mode"), + "ip_prefix": _mask_ipv4_in_text(str(domain_data.get("ip_prefix"))) if domain_data.get("ip_prefix") else None, + }, + "proxmox": proxmox_meta, + "coordinators": { + "resources": _safe_coordinator_state(resources), + "nodes": _safe_coordinator_state(nodes), + "node_coordinators": _safe_coord_map(node_coordinators), + "guest_coordinators": _safe_coord_map(guest_coordinators), + }, + "data_preview": { + "resources_preview": res_preview, + "nodes_preview": node_preview, + }, + } + + # FINAL PASS: mask any IPv4 that still appears anywhere (including cluster raw_preview "ip") + return _sanitize_public(result)