Add diagnostics for Proxmox config entries

Implement diagnostics for Proxmox configuration entries with public-safe sanitization for sensitive data.
This commit is contained in:
2026-01-14 09:02:58 +01:00
committed by GitHub
parent 3954d8f94f
commit 166c2f375e

View File

@@ -0,0 +1,340 @@
from __future__ import annotations
import re
from typing import Any, Awaitable, Callable
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from .const import DOMAIN
# ---------------------------
# Masking helpers
# ---------------------------
_IPV4_RE = re.compile(r"\b(?:(?:25[0-5]|2[0-4]\d|1?\d?\d)\.){3}(?:25[0-5]|2[0-4]\d|1?\d?\d)\b")
def _mask_ipv4(ip: str) -> str:
"""Mask IPv4 addresses (keep first two octets). Example: 192.168.178.101 -> 192.168.xxx.xxx"""
parts = ip.split(".")
if len(parts) != 4:
return ip
return f"{parts[0]}.{parts[1]}.xxx.xxx"
def _mask_ipv4_in_text(text: str) -> str:
"""Replace any IPv4 occurrences inside a string."""
return _IPV4_RE.sub(lambda m: _mask_ipv4(m.group(0)), text)
def _mask_token_name(value: Any) -> Any:
"""Show only first 2 and last 2 chars of token_name."""
if not isinstance(value, str):
return value
s = value.strip()
if len(s) <= 4:
return "*" * len(s) if s else s
return f"{s[:2]}***{s[-2:]}"
def _redact_secret(value: Any) -> Any:
"""Redact secrets while keeping structure intact."""
if value is None:
return None
if isinstance(value, str):
s = value.strip()
if not s:
return value
if len(s) <= 6:
return "***"
return s[:3] + "***" + s[-3:]
if isinstance(value, dict):
return {k: _redact_secret(v) for k, v in value.items()}
if isinstance(value, list):
return [_redact_secret(v) for v in value]
return value
def _sanitize_public(value: Any) -> Any:
"""
Public-safe sanitization:
- Mask all IPv4 strings anywhere
- Keep structure
"""
if value is None:
return None
if isinstance(value, str):
return _mask_ipv4_in_text(value)
if isinstance(value, dict):
return {str(k): _sanitize_public(v) for k, v in value.items()}
if isinstance(value, list):
return [_sanitize_public(v) for v in value]
if isinstance(value, tuple):
return [_sanitize_public(v) for v in value]
return value
# ---------------------------
# Coordinator helpers
# ---------------------------
def _safe_coordinator_state(coord: Any) -> dict[str, Any]:
if not coord:
return {}
data = getattr(coord, "data", None)
preview = None
if isinstance(data, list):
preview = data[:3]
return {
"name": getattr(coord, "name", None),
"update_interval": str(getattr(coord, "update_interval", None)),
"last_update_success": getattr(coord, "last_update_success", None),
"last_exception": repr(getattr(coord, "last_exception", None)),
"data_type": type(data).__name__,
"data_preview": preview,
}
def _stringify_key(key: Any) -> str:
if isinstance(key, tuple):
return ":".join(str(x) for x in key)
return str(key)
def _safe_coord_map(coord_map: Any) -> dict[str, Any]:
if not isinstance(coord_map, dict):
return {}
return {_stringify_key(k): _safe_coordinator_state(v) for k, v in coord_map.items()}
# ---------------------------
# Proxmox client access
# ---------------------------
async def _try_call(func: Callable[..., Awaitable[Any]], *args: Any, **kwargs: Any) -> Any:
return await func(*args, **kwargs)
async def _client_get_json(client: Any, path: str) -> Any:
"""
Best-effort JSON GET against the Proxmox client.
Tries multiple method names to stay compatible with different client styles.
"""
if client is None:
return None
if not path.startswith("/"):
path = "/" + path
candidates = []
for name in (
"get",
"api_get",
"request",
"api_request",
"_request",
"_api_request",
"get_json",
"async_get",
):
fn = getattr(client, name, None)
if callable(fn):
candidates.append((name, fn))
last_err: Exception | None = None
for name, fn in candidates:
try:
if name in ("request", "api_request", "_request", "_api_request"):
data = await _try_call(fn, "GET", path)
else:
data = await _try_call(fn, path)
# Unwrap {"data": ...} responses
if isinstance(data, dict) and "data" in data and len(data) == 1:
return data["data"]
return data
except Exception as err: # noqa: BLE001
last_err = err
continue
if last_err:
return {
"error": f"Could not query {path} via client "
f"({type(last_err).__name__}: {last_err})"
}
return {"error": f"Could not query {path} via client (no compatible method found)"}
# ---------------------------
# Proxmox data processing
# ---------------------------
def _count_resources(resources_data: Any) -> dict[str, int]:
counts = {
"nodes": 0,
"vms": 0,
"containers": 0,
"total_guests": 0,
}
if not isinstance(resources_data, list):
return counts
for r in resources_data:
if not isinstance(r, dict):
continue
r_type = r.get("type")
if r_type == "node":
counts["nodes"] += 1
elif r_type == "qemu":
counts["vms"] += 1
counts["total_guests"] += 1
elif r_type == "lxc":
counts["containers"] += 1
counts["total_guests"] += 1
return counts
def _extract_cluster_info(cluster_status: Any) -> dict[str, Any] | None:
if not isinstance(cluster_status, list):
return None
cluster_name = None
node_count = 0
for item in cluster_status:
if not isinstance(item, dict):
continue
if item.get("type") == "cluster":
cluster_name = item.get("name") or cluster_name
if item.get("type") == "node":
node_count += 1
return {
"name": cluster_name,
"nodes": node_count,
"raw_preview": cluster_status[:5],
}
# ---------------------------
# Main diagnostics entrypoint
# ---------------------------
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, entry: ConfigEntry
) -> dict[str, Any]:
"""Return diagnostics for a config entry (public-safe, JSON serializable)."""
domain_data = hass.data.get(DOMAIN, {}).get(entry.entry_id, {}) or {}
client = domain_data.get("client")
resources = domain_data.get("resources")
nodes = domain_data.get("nodes")
guest_coordinators = domain_data.get("guest_coordinators", {}) or {}
node_coordinators = domain_data.get("node_coordinators", {}) or {}
# ---- Entry data/options with focused redaction ----
entry_data = dict(entry.data)
# Mask secrets
for key in ("token_value", "password", "api_key", "secret"):
if key in entry_data:
entry_data[key] = _redact_secret(entry_data[key])
# Mask token_name per requirement
if "token_name" in entry_data:
entry_data["token_name"] = _mask_token_name(entry_data["token_name"])
# Host/IP should be masked
if "host" in entry_data and isinstance(entry_data["host"], str):
entry_data["host"] = _mask_ipv4_in_text(entry_data["host"])
options = dict(entry.options)
# ip_prefix may reveal network; mask if it looks like an IP-ish prefix
if "ip_prefix" in options and isinstance(options["ip_prefix"], str):
options["ip_prefix"] = _mask_ipv4_in_text(options["ip_prefix"])
# ---- Resource previews & counts ----
res_preview = None
res_counts = {"nodes": 0, "vms": 0, "containers": 0, "total_guests": 0}
try:
if resources and isinstance(resources.data, list):
res_counts = _count_resources(resources.data)
res_preview = [
{
"type": r.get("type"),
"node": r.get("node"),
"vmid": r.get("vmid"),
"name": r.get("name"),
"status": r.get("status"),
}
for r in resources.data[:25]
if isinstance(r, dict)
]
except Exception: # noqa: BLE001
res_preview = None
node_preview = None
try:
if nodes and isinstance(nodes.data, list):
node_preview = [
{
"node": n.get("node"),
"status": n.get("status"),
"uptime": n.get("uptime"),
"cpu": n.get("cpu"),
"mem": n.get("mem"),
"maxmem": n.get("maxmem"),
}
for n in nodes.data[:15]
if isinstance(n, dict)
]
except Exception: # noqa: BLE001
node_preview = None
# ---- Proxmox meta information ----
version_info = await _client_get_json(client, "/version")
cluster_status = await _client_get_json(client, "/cluster/status")
cluster_info = _extract_cluster_info(cluster_status)
proxmox_meta = {
"version": version_info,
"cluster": cluster_info,
"counts": res_counts,
}
result = {
"entry": {
"entry_id": entry.entry_id,
# Title can contain IPs (like "Proxmox 192.168.x.x") -> mask it
"title": _mask_ipv4_in_text(entry.title),
"data": entry_data,
"options": options,
},
"runtime": {
"client": {
"host": _mask_ipv4_in_text(str(getattr(client, "host", ""))) if client else None,
"port": getattr(client, "port", None) if client else None,
"verify_ssl": getattr(client, "verify_ssl", None) if client else None,
},
"scan_interval": domain_data.get("scan_interval"),
"ip_mode": domain_data.get("ip_mode"),
"ip_prefix": _mask_ipv4_in_text(str(domain_data.get("ip_prefix"))) if domain_data.get("ip_prefix") else None,
},
"proxmox": proxmox_meta,
"coordinators": {
"resources": _safe_coordinator_state(resources),
"nodes": _safe_coordinator_state(nodes),
"node_coordinators": _safe_coord_map(node_coordinators),
"guest_coordinators": _safe_coord_map(guest_coordinators),
},
"data_preview": {
"resources_preview": res_preview,
"nodes_preview": node_preview,
},
}
# FINAL PASS: mask any IPv4 that still appears anywhere (including cluster raw_preview "ip")
return _sanitize_public(result)