Dateien nach "custom_components/proxmox_pve" hochladen

This commit is contained in:
2026-01-14 15:52:23 +00:00
parent 6993939654
commit edc09c4cab
5 changed files with 555 additions and 0 deletions

View File

@@ -0,0 +1,156 @@
import logging
from datetime import timedelta
import aiohttp
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from .api import ProxmoxClient
from .const import (
DOMAIN,
PLATFORMS,
CONF_SCAN_INTERVAL,
CONF_IP_MODE,
CONF_IP_PREFIX,
DEFAULT_SCAN_INTERVAL,
DEFAULT_IP_MODE,
DEFAULT_IP_PREFIX,
)
from .coordinator import ProxmoxResourcesCoordinator, ProxmoxNodesCoordinator
from .services import async_register_services, async_unregister_services
_LOGGER = logging.getLogger(__name__)
def _opt(entry: ConfigEntry, key: str, default):
return entry.options.get(key, default)
async def _apply_options_now(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Apply updated options to all coordinators without reload/restart."""
data = hass.data[DOMAIN][entry.entry_id]
new_scan_interval = int(_opt(entry, CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL))
new_ip_mode = str(_opt(entry, CONF_IP_MODE, DEFAULT_IP_MODE))
new_ip_prefix = str(_opt(entry, CONF_IP_PREFIX, DEFAULT_IP_PREFIX))
data["scan_interval"] = new_scan_interval
data["ip_mode"] = new_ip_mode
data["ip_prefix"] = new_ip_prefix
td = timedelta(seconds=new_scan_interval)
resources = data.get("resources")
if resources:
resources.update_interval = td
nodes = data.get("nodes")
if nodes:
nodes.update_interval = td
for node_coord in (data.get("node_coordinators") or {}).values():
node_coord.update_interval = td
for guest_coord in (data.get("guest_coordinators") or {}).values():
guest_coord.update_interval = td
guest_coord.ip_mode = new_ip_mode
guest_coord.ip_prefix = new_ip_prefix
# trigger refresh so UI updates quickly
tasks = []
if resources:
tasks.append(resources.async_request_refresh())
if nodes:
tasks.append(nodes.async_request_refresh())
for node_coord in (data.get("node_coordinators") or {}).values():
tasks.append(node_coord.async_request_refresh())
for guest_coord in (data.get("guest_coordinators") or {}).values():
tasks.append(guest_coord.async_request_refresh())
for t in tasks:
hass.async_create_task(t)
_LOGGER.debug(
"Applied options live for %s: scan_interval=%s ip_mode=%s ip_prefix=%s",
entry.entry_id,
new_scan_interval,
new_ip_mode,
new_ip_prefix,
)
async def _update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
await _apply_options_now(hass, entry)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
session = aiohttp.ClientSession()
client = ProxmoxClient(
host=entry.data["host"],
port=entry.data["port"],
token_name=entry.data["token_name"],
token_value=entry.data["token_value"],
verify_ssl=entry.data["verify_ssl"],
session=session,
)
scan_interval = int(_opt(entry, CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL))
ip_mode = str(_opt(entry, CONF_IP_MODE, DEFAULT_IP_MODE))
ip_prefix = str(_opt(entry, CONF_IP_PREFIX, DEFAULT_IP_PREFIX))
resources = ProxmoxResourcesCoordinator(hass, client, scan_interval=scan_interval)
nodes = ProxmoxNodesCoordinator(hass, client, scan_interval=scan_interval)
await resources.async_config_entry_first_refresh()
await nodes.async_config_entry_first_refresh()
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = {
"session": session,
"client": client,
"resources": resources,
"nodes": nodes,
"scan_interval": scan_interval,
"ip_mode": ip_mode,
"ip_prefix": ip_prefix,
"guest_coordinators": {},
"node_coordinators": {},
"platform_cache": {},
}
# Register services once
await async_register_services(hass)
# Apply option updates live
entry.async_on_unload(entry.add_update_listener(_update_listener))
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
_LOGGER.debug("Set up proxmox_pve entry %s", entry.entry_id)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
data = hass.data.get(DOMAIN, {}).pop(entry.entry_id, None)
if data:
pc = data.get("platform_cache") or {}
for key in ("sensor_unsub", "switch_unsub", "button_unsub"):
for unsub in pc.get(key, []):
try:
unsub()
except Exception:
pass
if data.get("session"):
await data["session"].close()
# If no entries remain, unregister services
if not hass.data.get(DOMAIN):
await async_unregister_services(hass)
return unload_ok

View File

@@ -0,0 +1,64 @@
import asyncio
from dataclasses import dataclass
from typing import Any
import aiohttp
class ProxmoxApiError(Exception):
"""Raised for Proxmox API errors."""
@dataclass
class ProxmoxClient:
host: str
port: int
token_name: str # "USER@REALM!TOKENID"
token_value: str
verify_ssl: bool
session: aiohttp.ClientSession
@property
def base_url(self) -> str:
return f"https://{self.host}:{self.port}/api2/json"
def _headers(self) -> dict[str, str]:
return {"Authorization": f"PVEAPIToken={self.token_name}={self.token_value}"}
async def _request(self, method: str, path: str, **kwargs) -> Any:
url = f"{self.base_url}{path}"
kwargs.setdefault("headers", {})
kwargs["headers"].update(self._headers())
ssl = None if self.verify_ssl else False
try:
async with self.session.request(method, url, ssl=ssl, **kwargs) as resp:
text = await resp.text()
if resp.status >= 400:
raise ProxmoxApiError(f"HTTP {resp.status} calling {path}: {text}")
payload = await resp.json()
return payload.get("data")
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
raise ProxmoxApiError(str(e)) from e
async def test_connection(self) -> None:
await self._request("GET", "/version")
async def list_cluster_resources(self) -> list[dict[str, Any]]:
return await self._request("GET", "/cluster/resources", params={"type": "vm"})
async def list_nodes(self) -> list[dict[str, Any]]:
return await self._request("GET", "/nodes")
async def get_node_status(self, node: str) -> dict[str, Any]:
return await self._request("GET", f"/nodes/{node}/status")
async def guest_action(self, node: str, vmid: int, vmtype: str, action: str) -> Any:
return await self._request("POST", f"/nodes/{node}/{vmtype}/{vmid}/status/{action}")
async def get_guest_status_current(self, node: str, vmid: int, vmtype: str) -> dict[str, Any]:
return await self._request("GET", f"/nodes/{node}/{vmtype}/{vmid}/status/current")
async def get_qemu_agent_network_ifaces(self, node: str, vmid: int) -> dict[str, Any]:
return await self._request("GET", f"/nodes/{node}/qemu/{vmid}/agent/network-get-interfaces")

View File

@@ -0,0 +1,200 @@
import asyncio
import logging
from homeassistant.components.button import ButtonEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN
from .coordinator import ProxmoxGuestCoordinator
_LOGGER = logging.getLogger(__name__)
def _guest_display_name(resource: dict) -> str:
name = resource.get("name") or f"{resource.get('type')} {resource.get('vmid')}"
return f"{name} (VMID {resource.get('vmid')})"
def _guest_id(resource: dict) -> str:
return f"{resource.get('node')}:{resource.get('type')}:{resource.get('vmid')}"
def _guest_key(resource: dict) -> tuple[str, str, int]:
return (resource["node"], resource["type"], int(resource["vmid"]))
def _model_for(resource: dict) -> str:
return "Virtual Machine" if resource.get("type") == "qemu" else "Container"
async def _get_guest_coordinator(hass: HomeAssistant, entry: ConfigEntry, resource: dict) -> ProxmoxGuestCoordinator:
data = hass.data[DOMAIN][entry.entry_id]
key = _guest_key(resource)
if key in data["guest_coordinators"]:
return data["guest_coordinators"][key]
coord = ProxmoxGuestCoordinator(
hass=hass,
client=data["client"],
node=key[0],
vmtype=key[1],
vmid=key[2],
scan_interval=int(data["scan_interval"]),
ip_mode=str(data["ip_mode"]),
ip_prefix=str(data["ip_prefix"]),
)
data["guest_coordinators"][key] = coord
hass.async_create_task(coord.async_config_entry_first_refresh())
return coord
def _update_device_name(hass: HomeAssistant, guest_id: str, new_name: str, model: str) -> None:
dev_reg = dr.async_get(hass)
device = dev_reg.async_get_device(identifiers={(DOMAIN, guest_id)})
if device and (device.name != new_name or device.model != model):
dev_reg.async_update_device(device.id, name=new_name, model=model)
async def _purge_guest_entity_registry(hass: HomeAssistant, entry: ConfigEntry, guest_id: str) -> None:
ent_reg = er.async_get(hass)
prefix = f"{entry.entry_id}_{guest_id}_"
to_remove: list[str] = []
for entity_id, ent in ent_reg.entities.items():
if ent.config_entry_id != entry.entry_id:
continue
if ent.unique_id and ent.unique_id.startswith(prefix):
to_remove.append(entity_id)
for entity_id in to_remove:
ent_reg.async_remove(entity_id)
await asyncio.sleep(0)
async def _remove_device(hass: HomeAssistant, guest_id: str) -> None:
dev_reg = dr.async_get(hass)
device = dev_reg.async_get_device(identifiers={(DOMAIN, guest_id)})
if device:
dev_reg.async_remove_device(device.id)
class _BaseGuestButton(CoordinatorEntity, ButtonEntity):
def __init__(self, coordinator, entry: ConfigEntry, resource: dict) -> None:
super().__init__(coordinator)
self._entry = entry
self._resource = dict(resource)
def update_resource(self, resource: dict) -> None:
self._resource = dict(resource)
@property
def device_info(self):
return {
"identifiers": {(DOMAIN, _guest_id(self._resource))},
"name": _guest_display_name(self._resource),
"manufacturer": "Proxmox VE",
"model": _model_for(self._resource),
}
@property
def extra_state_attributes(self):
return {"vmid": self._resource.get("vmid"), "node": self._resource.get("node"), "type": self._resource.get("type")}
class ProxmoxGuestRebootButton(_BaseGuestButton):
_attr_icon = "mdi:restart"
def __init__(self, coordinator, entry: ConfigEntry, resource: dict) -> None:
super().__init__(coordinator, entry, resource)
self._attr_unique_id = f"{entry.entry_id}_{_guest_id(resource)}_reboot"
@property
def name(self) -> str:
return f"{_guest_display_name(self._resource)} Reboot"
async def async_press(self) -> None:
client = self.hass.data[DOMAIN][self._entry.entry_id]["client"]
await client.guest_action(self._resource["node"], int(self._resource["vmid"]), self._resource["type"], "reboot")
await self.coordinator.async_request_refresh()
class ProxmoxGuestHardStopButton(_BaseGuestButton):
_attr_icon = "mdi:stop-circle"
def __init__(self, coordinator, entry: ConfigEntry, resource: dict) -> None:
super().__init__(coordinator, entry, resource)
self._attr_unique_id = f"{entry.entry_id}_{_guest_id(resource)}_stop_hard"
@property
def name(self) -> str:
return f"{_guest_display_name(self._resource)} Stop (hard)"
async def async_press(self) -> None:
client = self.hass.data[DOMAIN][self._entry.entry_id]["client"]
await client.guest_action(self._resource["node"], int(self._resource["vmid"]), self._resource["type"], "stop")
await self.coordinator.async_request_refresh()
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback) -> None:
data = hass.data[DOMAIN][entry.entry_id]
resources_coord = data["resources"]
platform_cache = data.setdefault("platform_cache", {})
cache: dict[tuple[str, str, int], list[ButtonEntity]] = platform_cache.setdefault("button", {})
async def _sync() -> None:
resources = resources_coord.data or []
current: dict[tuple[str, str, int], dict] = {}
for r in resources:
if r.get("type") not in ("qemu", "lxc"):
continue
if r.get("node") is None or r.get("vmid") is None:
continue
current[_guest_key(r)] = r
# update
for key, r in current.items():
if key not in cache:
continue
gid = _guest_id(r)
_update_device_name(hass, gid, _guest_display_name(r), _model_for(r))
for ent in cache[key]:
ent.update_resource(r)
ent.async_write_ha_state()
# add
new_entities: list[ButtonEntity] = []
for key, r in current.items():
if key in cache:
continue
guest_coord = await _get_guest_coordinator(hass, entry, r)
ents = [ProxmoxGuestRebootButton(guest_coord, entry, r), ProxmoxGuestHardStopButton(guest_coord, entry, r)]
cache[key] = ents
new_entities.extend(ents)
if new_entities:
async_add_entities(new_entities, update_before_add=False)
# remove (hard cleanup)
removed = [k for k in list(cache.keys()) if k not in current]
for k in removed:
gid = f"{k[0]}:{k[1]}:{k[2]}"
for ent in cache[k]:
await ent.async_remove()
del cache[k]
data["guest_coordinators"].pop(k, None)
await _purge_guest_entity_registry(hass, entry, gid)
await _remove_device(hass, gid)
await _sync()
unsub = resources_coord.async_add_listener(lambda: hass.async_create_task(_sync()))
platform_cache.setdefault("button_unsub", []).append(unsub)

View File

@@ -0,0 +1,108 @@
import aiohttp
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.data_entry_flow import FlowResult
from .api import ProxmoxClient, ProxmoxApiError
from .const import (
DOMAIN,
CONF_HOST,
CONF_PORT,
CONF_VERIFY_SSL,
CONF_TOKEN_NAME,
CONF_TOKEN_VALUE,
DEFAULT_PORT,
DEFAULT_VERIFY_SSL,
# options
CONF_SCAN_INTERVAL,
CONF_IP_MODE,
CONF_IP_PREFIX,
DEFAULT_SCAN_INTERVAL,
DEFAULT_IP_MODE,
DEFAULT_IP_PREFIX,
IP_MODE_PREFER_192168,
IP_MODE_PREFER_PRIVATE,
IP_MODE_ANY,
IP_MODE_CUSTOM_PREFIX,
)
STEP_USER_SCHEMA = vol.Schema(
{
vol.Required(CONF_HOST): str,
vol.Optional(CONF_PORT, default=DEFAULT_PORT): vol.Coerce(int),
vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL): bool,
vol.Required(CONF_TOKEN_NAME): str,
vol.Required(CONF_TOKEN_VALUE): str,
}
)
async def _validate_input(data: dict) -> None:
async with aiohttp.ClientSession() as session:
client = ProxmoxClient(
host=data[CONF_HOST],
port=int(data[CONF_PORT]),
token_name=data[CONF_TOKEN_NAME],
token_value=data[CONF_TOKEN_VALUE],
verify_ssl=bool(data[CONF_VERIFY_SSL]),
session=session,
)
await client.test_connection()
def _options_schema(current: dict) -> vol.Schema:
ip_modes = [IP_MODE_PREFER_192168, IP_MODE_PREFER_PRIVATE, IP_MODE_ANY, IP_MODE_CUSTOM_PREFIX]
return vol.Schema(
{
vol.Required(CONF_SCAN_INTERVAL, default=int(current.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL))): vol.All(
vol.Coerce(int), vol.Range(min=5, max=3600)
),
vol.Required(CONF_IP_MODE, default=current.get(CONF_IP_MODE, DEFAULT_IP_MODE)): vol.In(ip_modes),
vol.Required(CONF_IP_PREFIX, default=current.get(CONF_IP_PREFIX, DEFAULT_IP_PREFIX)): str,
}
)
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1
async def async_step_user(self, user_input=None) -> FlowResult:
errors = {}
if user_input is not None:
unique = f"{user_input[CONF_HOST]}:{user_input[CONF_PORT]}:{user_input[CONF_TOKEN_NAME]}"
await self.async_set_unique_id(unique)
self._abort_if_unique_id_configured()
try:
await _validate_input(user_input)
except ProxmoxApiError:
errors["base"] = "cannot_connect"
except Exception:
errors["base"] = "unknown"
else:
title = f"Proxmox {user_input[CONF_HOST]}"
return self.async_create_entry(title=title, data=user_input)
return self.async_show_form(step_id="user", data_schema=STEP_USER_SCHEMA, errors=errors)
@staticmethod
def async_get_options_flow(config_entry: config_entries.ConfigEntry):
# IMPORTANT: Do not store config_entry on OptionsFlowHandler as attribute named config_entry.
return OptionsFlowHandler()
class OptionsFlowHandler(config_entries.OptionsFlow):
async def async_step_init(self, user_input=None) -> FlowResult:
# Fetch the config entry safely (works across HA versions)
entry_id = self.context.get("entry_id")
config_entry = self.hass.config_entries.async_get_entry(entry_id) if entry_id else None
current = dict(config_entry.options) if config_entry else {}
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
return self.async_show_form(step_id="init", data_schema=_options_schema(current))

View File

@@ -0,0 +1,27 @@
DOMAIN = "proxmox_pve"
CONF_HOST = "host"
CONF_PORT = "port"
CONF_VERIFY_SSL = "verify_ssl"
CONF_TOKEN_NAME = "token_name"
CONF_TOKEN_VALUE = "token_value"
# Options
CONF_SCAN_INTERVAL = "scan_interval"
CONF_IP_MODE = "ip_mode"
CONF_IP_PREFIX = "ip_prefix"
DEFAULT_PORT = 8006
DEFAULT_VERIFY_SSL = True
DEFAULT_SCAN_INTERVAL = 20
IP_MODE_PREFER_192168 = "prefer_192168"
IP_MODE_PREFER_PRIVATE = "prefer_private"
IP_MODE_ANY = "any"
IP_MODE_CUSTOM_PREFIX = "custom_prefix"
DEFAULT_IP_MODE = IP_MODE_PREFER_192168
DEFAULT_IP_PREFIX = "192.168."
PLATFORMS = ["sensor", "switch", "button"]