10 Commits

Author SHA1 Message Date
166c2f375e Add diagnostics for Proxmox config entries
Implement diagnostics for Proxmox configuration entries with public-safe sanitization for sensitive data.
2026-01-14 09:02:58 +01:00
3954d8f94f Update version to 0.7.5 in manifest.json 2026-01-14 09:02:01 +01:00
9fe21eb096 Update CHANGELOG for version 0.7.5
Added extended Diagnostics export with Proxmox version and cluster metadata. Ensured diagnostics are safe for public sharing by masking IP addresses and redacting API tokens.
2026-01-14 09:01:31 +01:00
f0e17bf90a Update version to 0.7.4 in manifest.json 2026-01-14 07:40:38 +01:00
c374740503 Add diagnostics export instructions to README
Added a diagnostics section to README for issue reporting.
2026-01-14 07:39:23 +01:00
d4cbf1d303 Add Home Assistant Diagnostics support
Added support for Home Assistant Diagnostics, including a new feature to download diagnostics for each Easy Proxmox config entry. Diagnostics include sanitized config entry data, runtime client information, and more, with sensitive data redacted.
2026-01-14 07:38:27 +01:00
39fc090944 Change to v0.7.3 2026-01-13 22:42:06 +01:00
8f8ac50905 Change version. Add Dokumentation Link to Readme. 2026-01-13 22:40:12 +01:00
66b0db4a6d Add 0.7.3 2026-01-13 22:38:40 +01:00
fea7f3c4af Aktualisieren von services.py 2026-01-13 22:16:53 +01:00
5 changed files with 448 additions and 38 deletions

View File

@@ -1,5 +1,36 @@
# Changelog
## 0.7.5
- Added extended Diagnostics export
- Includes Proxmox version information (`/version`)
- Includes cluster metadata (`/cluster/status`)
- Adds node, VM and container counts
- Diagnostics are safe for public sharing
- All IP addresses are masked
- API tokens remain redacted
- Token names are partially anonymized (first 2 + last 2 characters)
## 0.7.4
- Added Home Assistant Diagnostics support
- New “Download diagnostics” feature for each Easy Proxmox config entry
- Diagnostics include:
- Config entry data and options (sanitized)
- Runtime client information
- Coordinator states (last update success, exceptions, update interval)
- Safe previews of nodes and guests
- Sensitive data such as API tokens and credentials are automatically redacted
- Diagnostics are fully JSON-serializable and suitable for GitHub issue attachments
## 0.7.3
- Fixed service execution when using device targets in automations and scripts
- Services now work correctly on Home Assistant versions where `ServiceCall.target` is not available
- Improved target resolution:
- Supports `device_id` passed via UI targets and via service data
- Supports `entity_id` targets and automatically resolves them to the corresponding device
- Accepts both `str` and `list[str]` formats for target identifiers
- Fixed issue where service calls were accepted but no Proxmox action was executed
- Improved compatibility with the Home Assistant automation editor and mobile UI
## 0.7.2
- Fixed service validation for device targets:
- Home Assistant may pass `device_id` as a list (target/data wrapper)
@@ -128,3 +159,5 @@

View File

@@ -329,6 +329,14 @@ The API token has admin rights. Treat it like a root password:
| Buttons dont work | Check Proxmox permissions (PVEAdmin role) |
| Old devices remain | Fully cleaned up automatically since version 0.4.1 |
### Diagnostics
If you open an issue on GitHub, please attach a diagnostics export:
Settings → Devices & Services → Easy Proxmox → (⋮) → Download diagnostics
This export is automatically sanitized (API token is redacted).
---
## Support & Contributing

View File

@@ -0,0 +1,340 @@
from __future__ import annotations
import re
from typing import Any, Awaitable, Callable
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from .const import DOMAIN
# ---------------------------
# Masking helpers
# ---------------------------
_IPV4_RE = re.compile(r"\b(?:(?:25[0-5]|2[0-4]\d|1?\d?\d)\.){3}(?:25[0-5]|2[0-4]\d|1?\d?\d)\b")
def _mask_ipv4(ip: str) -> str:
"""Mask IPv4 addresses (keep first two octets). Example: 192.168.178.101 -> 192.168.xxx.xxx"""
parts = ip.split(".")
if len(parts) != 4:
return ip
return f"{parts[0]}.{parts[1]}.xxx.xxx"
def _mask_ipv4_in_text(text: str) -> str:
"""Replace any IPv4 occurrences inside a string."""
return _IPV4_RE.sub(lambda m: _mask_ipv4(m.group(0)), text)
def _mask_token_name(value: Any) -> Any:
"""Show only first 2 and last 2 chars of token_name."""
if not isinstance(value, str):
return value
s = value.strip()
if len(s) <= 4:
return "*" * len(s) if s else s
return f"{s[:2]}***{s[-2:]}"
def _redact_secret(value: Any) -> Any:
"""Redact secrets while keeping structure intact."""
if value is None:
return None
if isinstance(value, str):
s = value.strip()
if not s:
return value
if len(s) <= 6:
return "***"
return s[:3] + "***" + s[-3:]
if isinstance(value, dict):
return {k: _redact_secret(v) for k, v in value.items()}
if isinstance(value, list):
return [_redact_secret(v) for v in value]
return value
def _sanitize_public(value: Any) -> Any:
"""
Public-safe sanitization:
- Mask all IPv4 strings anywhere
- Keep structure
"""
if value is None:
return None
if isinstance(value, str):
return _mask_ipv4_in_text(value)
if isinstance(value, dict):
return {str(k): _sanitize_public(v) for k, v in value.items()}
if isinstance(value, list):
return [_sanitize_public(v) for v in value]
if isinstance(value, tuple):
return [_sanitize_public(v) for v in value]
return value
# ---------------------------
# Coordinator helpers
# ---------------------------
def _safe_coordinator_state(coord: Any) -> dict[str, Any]:
if not coord:
return {}
data = getattr(coord, "data", None)
preview = None
if isinstance(data, list):
preview = data[:3]
return {
"name": getattr(coord, "name", None),
"update_interval": str(getattr(coord, "update_interval", None)),
"last_update_success": getattr(coord, "last_update_success", None),
"last_exception": repr(getattr(coord, "last_exception", None)),
"data_type": type(data).__name__,
"data_preview": preview,
}
def _stringify_key(key: Any) -> str:
if isinstance(key, tuple):
return ":".join(str(x) for x in key)
return str(key)
def _safe_coord_map(coord_map: Any) -> dict[str, Any]:
if not isinstance(coord_map, dict):
return {}
return {_stringify_key(k): _safe_coordinator_state(v) for k, v in coord_map.items()}
# ---------------------------
# Proxmox client access
# ---------------------------
async def _try_call(func: Callable[..., Awaitable[Any]], *args: Any, **kwargs: Any) -> Any:
return await func(*args, **kwargs)
async def _client_get_json(client: Any, path: str) -> Any:
"""
Best-effort JSON GET against the Proxmox client.
Tries multiple method names to stay compatible with different client styles.
"""
if client is None:
return None
if not path.startswith("/"):
path = "/" + path
candidates = []
for name in (
"get",
"api_get",
"request",
"api_request",
"_request",
"_api_request",
"get_json",
"async_get",
):
fn = getattr(client, name, None)
if callable(fn):
candidates.append((name, fn))
last_err: Exception | None = None
for name, fn in candidates:
try:
if name in ("request", "api_request", "_request", "_api_request"):
data = await _try_call(fn, "GET", path)
else:
data = await _try_call(fn, path)
# Unwrap {"data": ...} responses
if isinstance(data, dict) and "data" in data and len(data) == 1:
return data["data"]
return data
except Exception as err: # noqa: BLE001
last_err = err
continue
if last_err:
return {
"error": f"Could not query {path} via client "
f"({type(last_err).__name__}: {last_err})"
}
return {"error": f"Could not query {path} via client (no compatible method found)"}
# ---------------------------
# Proxmox data processing
# ---------------------------
def _count_resources(resources_data: Any) -> dict[str, int]:
counts = {
"nodes": 0,
"vms": 0,
"containers": 0,
"total_guests": 0,
}
if not isinstance(resources_data, list):
return counts
for r in resources_data:
if not isinstance(r, dict):
continue
r_type = r.get("type")
if r_type == "node":
counts["nodes"] += 1
elif r_type == "qemu":
counts["vms"] += 1
counts["total_guests"] += 1
elif r_type == "lxc":
counts["containers"] += 1
counts["total_guests"] += 1
return counts
def _extract_cluster_info(cluster_status: Any) -> dict[str, Any] | None:
if not isinstance(cluster_status, list):
return None
cluster_name = None
node_count = 0
for item in cluster_status:
if not isinstance(item, dict):
continue
if item.get("type") == "cluster":
cluster_name = item.get("name") or cluster_name
if item.get("type") == "node":
node_count += 1
return {
"name": cluster_name,
"nodes": node_count,
"raw_preview": cluster_status[:5],
}
# ---------------------------
# Main diagnostics entrypoint
# ---------------------------
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, entry: ConfigEntry
) -> dict[str, Any]:
"""Return diagnostics for a config entry (public-safe, JSON serializable)."""
domain_data = hass.data.get(DOMAIN, {}).get(entry.entry_id, {}) or {}
client = domain_data.get("client")
resources = domain_data.get("resources")
nodes = domain_data.get("nodes")
guest_coordinators = domain_data.get("guest_coordinators", {}) or {}
node_coordinators = domain_data.get("node_coordinators", {}) or {}
# ---- Entry data/options with focused redaction ----
entry_data = dict(entry.data)
# Mask secrets
for key in ("token_value", "password", "api_key", "secret"):
if key in entry_data:
entry_data[key] = _redact_secret(entry_data[key])
# Mask token_name per requirement
if "token_name" in entry_data:
entry_data["token_name"] = _mask_token_name(entry_data["token_name"])
# Host/IP should be masked
if "host" in entry_data and isinstance(entry_data["host"], str):
entry_data["host"] = _mask_ipv4_in_text(entry_data["host"])
options = dict(entry.options)
# ip_prefix may reveal network; mask if it looks like an IP-ish prefix
if "ip_prefix" in options and isinstance(options["ip_prefix"], str):
options["ip_prefix"] = _mask_ipv4_in_text(options["ip_prefix"])
# ---- Resource previews & counts ----
res_preview = None
res_counts = {"nodes": 0, "vms": 0, "containers": 0, "total_guests": 0}
try:
if resources and isinstance(resources.data, list):
res_counts = _count_resources(resources.data)
res_preview = [
{
"type": r.get("type"),
"node": r.get("node"),
"vmid": r.get("vmid"),
"name": r.get("name"),
"status": r.get("status"),
}
for r in resources.data[:25]
if isinstance(r, dict)
]
except Exception: # noqa: BLE001
res_preview = None
node_preview = None
try:
if nodes and isinstance(nodes.data, list):
node_preview = [
{
"node": n.get("node"),
"status": n.get("status"),
"uptime": n.get("uptime"),
"cpu": n.get("cpu"),
"mem": n.get("mem"),
"maxmem": n.get("maxmem"),
}
for n in nodes.data[:15]
if isinstance(n, dict)
]
except Exception: # noqa: BLE001
node_preview = None
# ---- Proxmox meta information ----
version_info = await _client_get_json(client, "/version")
cluster_status = await _client_get_json(client, "/cluster/status")
cluster_info = _extract_cluster_info(cluster_status)
proxmox_meta = {
"version": version_info,
"cluster": cluster_info,
"counts": res_counts,
}
result = {
"entry": {
"entry_id": entry.entry_id,
# Title can contain IPs (like "Proxmox 192.168.x.x") -> mask it
"title": _mask_ipv4_in_text(entry.title),
"data": entry_data,
"options": options,
},
"runtime": {
"client": {
"host": _mask_ipv4_in_text(str(getattr(client, "host", ""))) if client else None,
"port": getattr(client, "port", None) if client else None,
"verify_ssl": getattr(client, "verify_ssl", None) if client else None,
},
"scan_interval": domain_data.get("scan_interval"),
"ip_mode": domain_data.get("ip_mode"),
"ip_prefix": _mask_ipv4_in_text(str(domain_data.get("ip_prefix"))) if domain_data.get("ip_prefix") else None,
},
"proxmox": proxmox_meta,
"coordinators": {
"resources": _safe_coordinator_state(resources),
"nodes": _safe_coordinator_state(nodes),
"node_coordinators": _safe_coord_map(node_coordinators),
"guest_coordinators": _safe_coord_map(guest_coordinators),
},
"data_preview": {
"resources_preview": res_preview,
"nodes_preview": node_preview,
},
}
# FINAL PASS: mask any IPv4 that still appears anywhere (including cluster raw_preview "ip")
return _sanitize_public(result)

View File

@@ -1,11 +1,13 @@
{
"domain": "proxmox_pve",
"name": "Easy Proxmox (by René Bachmann)",
"version": "0.7.1",
"documentation": "https://example.invalid",
"version": "0.7.5",
"documentation": "https://github.com/bahmcloud/easy_proxmox ",
"requirements": ["aiohttp>=3.9.0"],
"codeowners": ["@BAHMCLOUD"],
"config_flow": true,
"iot_class": "local_polling",
"integration_type": "service"
}

View File

@@ -5,6 +5,7 @@ import voluptuous as vol
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers import entity_registry as er
from .const import DOMAIN
@@ -16,6 +17,7 @@ SERVICE_STOP_HARD = "stop_hard"
SERVICE_REBOOT = "reboot"
ATTR_DEVICE_ID = "device_id"
ATTR_ENTITY_ID = "entity_id"
ATTR_CONFIG_ENTRY_ID = "config_entry_id"
ATTR_HOST = "host"
ATTR_NODE = "node"
@@ -24,11 +26,11 @@ ATTR_TYPE = "type"
VALID_TYPES = ("qemu", "lxc")
# IMPORTANT:
# HA may pass device_id via target (list) OR data (list) depending on UI/script wrapper.
# Accept device_id passed as str or list[str] (depends on HA UI/script)
SERVICE_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_DEVICE_ID): vol.Any(str, [str]),
vol.Optional(ATTR_ENTITY_ID): vol.Any(str, [str]),
vol.Optional(ATTR_CONFIG_ENTRY_ID): str,
vol.Optional(ATTR_HOST): str,
vol.Optional(ATTR_NODE): str,
@@ -39,7 +41,6 @@ SERVICE_SCHEMA = vol.Schema(
def _first_str(value: Any) -> str | None:
"""Return first string from str or list[str], else None."""
if isinstance(value, str) and value.strip():
return value.strip()
if isinstance(value, list) and value:
@@ -49,25 +50,54 @@ def _first_str(value: Any) -> str | None:
return None
def _get_target_device_id(call: ServiceCall) -> str | None:
def _first_str_from_target(target: Any, key: str) -> str | None:
if isinstance(target, dict):
return _first_str(target.get(key))
return None
def _get_device_id(hass: HomeAssistant, call: ServiceCall) -> str | None:
"""
Prefer call.target.device_id (UI target).
Fallback to call.data.device_id (some wrappers convert target to data).
Robust device_id extraction across HA versions.
Priority:
1) call.target.device_id (newer HA)
2) call.data.device_id (some wrappers)
3) call.target.entity_id -> map to device_id
4) call.data.entity_id -> map to device_id
"""
if call.target and isinstance(call.target, dict):
dev_id = _first_str(call.target.get("device_id"))
target = getattr(call, "target", None)
# 1) target.device_id
dev_id = _first_str_from_target(target, "device_id")
if dev_id:
return dev_id
# fallback: data.device_id can be str or list[str]
return _first_str(call.data.get(ATTR_DEVICE_ID))
# 2) data.device_id
dev_id = _first_str(call.data.get(ATTR_DEVICE_ID))
if dev_id:
return dev_id
# 3) target.entity_id -> device_id
ent_id = _first_str_from_target(target, "entity_id")
if ent_id:
ent_reg = er.async_get(hass)
ent = ent_reg.async_get(ent_id)
if ent and ent.device_id:
return ent.device_id
# 4) data.entity_id -> device_id
ent_id = _first_str(call.data.get(ATTR_ENTITY_ID))
if ent_id:
ent_reg = er.async_get(hass)
ent = ent_reg.async_get(ent_id)
if ent and ent.device_id:
return ent.device_id
return None
def _parse_guest_identifier(identifier: str) -> Tuple[str, str, int]:
"""
Guest device identifier format: "node:type:vmid"
Example: "pve1:qemu:100"
"""
parts = identifier.split(":")
if len(parts) != 3:
raise ValueError(f"Invalid guest identifier: {identifier}")
@@ -79,8 +109,7 @@ def _parse_guest_identifier(identifier: str) -> Tuple[str, str, int]:
def _resolve_target(hass: HomeAssistant, call: ServiceCall) -> Tuple[str, str, int]:
"""Resolve node/type/vmid from device target OR node+vmid (+ optional type)."""
device_id = _get_target_device_id(call)
device_id = _get_device_id(hass, call)
node = call.data.get(ATTR_NODE)
vmid = call.data.get(ATTR_VMID)
vmtype = call.data.get(ATTR_TYPE, "qemu")
@@ -101,7 +130,7 @@ def _resolve_target(hass: HomeAssistant, call: ServiceCall) -> Tuple[str, str, i
raise ValueError(f"Selected device has no Easy Proxmox guest identifier: {device_id}")
if not node or vmid is None:
raise ValueError("Provide a Device target OR node + vmid (+ optional type/host/config_entry_id).")
raise ValueError("Provide a Device/Entity target OR node + vmid (+ optional type/host/config_entry_id).")
if vmtype not in VALID_TYPES:
raise ValueError(f"Invalid type: {vmtype} (allowed: {VALID_TYPES})")
@@ -171,17 +200,17 @@ def _pick_entry_id_by_guest_lookup(hass: HomeAssistant, node: str, vmtype: str,
if not matches:
raise ValueError(
f"Could not find guest {node}/{vmtype}/{vmid} in any configured Proxmox host. "
"Provide host or config_entry_id, or use device target."
"Provide host or config_entry_id, or use a Device/Entity target."
)
if len(matches) > 1:
raise ValueError(
f"Guest {node}/{vmtype}/{vmid} exists on multiple configured hosts (ambiguous). "
"Please provide host or config_entry_id, or use device target."
"Please provide host or config_entry_id, or use a Device/Entity target."
)
return matches[0]
def _resolve_entry_id(hass: HomeAssistant, call: ServiceCall, target: Tuple[str, str, int]) -> str:
def _resolve_entry_id(hass: HomeAssistant, call: ServiceCall, node: str, vmtype: str, vmid: int) -> str:
domain_entries = _get_domain_entries(hass)
config_entry_id = call.data.get(ATTR_CONFIG_ENTRY_ID)
@@ -190,7 +219,7 @@ def _resolve_entry_id(hass: HomeAssistant, call: ServiceCall, target: Tuple[str,
raise ValueError(f"config_entry_id '{config_entry_id}' not found or not loaded.")
return config_entry_id
device_id = _get_target_device_id(call)
device_id = _get_device_id(hass, call)
if device_id:
return _pick_entry_id_for_device(hass, device_id)
@@ -198,7 +227,6 @@ def _resolve_entry_id(hass: HomeAssistant, call: ServiceCall, target: Tuple[str,
if host:
return _pick_entry_id_by_host(hass, host)
node, vmtype, vmid = target
return _pick_entry_id_by_guest_lookup(hass, node, vmtype, vmid)
@@ -208,15 +236,14 @@ async def async_register_services(hass: HomeAssistant) -> None:
async def _call_action(call: ServiceCall, action: str) -> None:
node, vmtype, vmid = _resolve_target(hass, call)
entry_id = _resolve_entry_id(hass, call, (node, vmtype, vmid))
entry_id = _resolve_entry_id(hass, call, node, vmtype, vmid)
domain_entries = _get_domain_entries(hass)
entry_data = domain_entries.get(entry_id)
entry_data = _get_domain_entries(hass).get(entry_id)
if not isinstance(entry_data, dict) or not entry_data.get("client"):
raise ValueError(f"Selected config entry '{entry_id}' has no client (not loaded).")
client = entry_data["client"]
_LOGGER.debug("Service action=%s entry=%s target=%s/%s/%s", action, entry_id, node, vmtype, vmid)
_LOGGER.debug("Service action=%s entry=%s target=%s/%s/%s data=%s", action, entry_id, node, vmtype, vmid, call.data)
await client.guest_action(node=node, vmid=vmid, vmtype=vmtype, action=action)
async def handle_start(call: ServiceCall) -> None: