Files
HA-KNX-Bridge/custom_components/ha_knx_bridge/config_flow.py

687 lines
24 KiB
Python

from __future__ import annotations
import logging
import uuid
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_ENTITY_ID
from homeassistant.core import callback
from homeassistant.helpers import selector
from .const import (
CONF_ANGLE_ADDRESS,
CONF_ANGLE_STATE_ADDRESS,
CONF_COMMAND_ADDRESS,
CONF_INVERT_INCOMING,
CONF_INVERT_OUTGOING,
CONF_KNX_ENTRY_ID,
CONF_MOVE_LONG_ADDRESS,
CONF_MOVE_SHORT_ADDRESS,
CONF_PORTS,
CONF_PORT_ID,
CONF_POSITION_ADDRESS,
CONF_POSITION_STATE_ADDRESS,
CONF_STATE_ADDRESS,
CONF_STOP_ADDRESS,
DOMAIN,
)
_LOGGER = logging.getLogger(__name__)
KNX_DOMAIN = "knx"
class HAKnxBridgeConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1
async def async_step_user(self, user_input: dict | None = None):
knx_entries = self.hass.config_entries.async_entries(KNX_DOMAIN)
if not knx_entries:
return self.async_abort(reason="knx_not_configured")
if user_input is not None:
return self.async_create_entry(
title="HA KNX Bridge",
data={CONF_KNX_ENTRY_ID: user_input[CONF_KNX_ENTRY_ID]},
)
options = [
{"value": entry.entry_id, "label": entry.title or entry.entry_id}
for entry in knx_entries
]
schema = vol.Schema(
{
vol.Required(CONF_KNX_ENTRY_ID): selector.SelectSelector(
selector.SelectSelectorConfig(options=options, mode="dropdown")
)
}
)
return self.async_show_form(step_id="user", data_schema=schema)
@staticmethod
@callback
def async_get_options_flow(config_entry):
return HAKnxBridgeOptionsFlow(config_entry)
@staticmethod
@callback
def async_get_supported_subentry_types(config_entry):
subentry_type = _get_subentry_type()
if subentry_type is None:
_LOGGER.debug(
"Config subentries are not supported in this Home Assistant version"
)
return {}
return {
"binary_sensor": subentry_type(
name="Binary Sensor Port", flow_class=BinarySensorPortSubentryFlow
),
"switch": subentry_type(
name="Switch Port", flow_class=SwitchPortSubentryFlow
),
"cover": subentry_type(
name="Cover Port", flow_class=CoverPortSubentryFlow
),
}
class HAKnxBridgeOptionsFlow(config_entries.OptionsFlow):
def __init__(self, config_entry: config_entries.ConfigEntry) -> None:
self._config_entry = config_entry
async def async_step_init(self, user_input: dict | None = None):
return self.async_show_menu(
step_id="init",
menu_options=[
"add_binary_sensor",
"add_switch",
"add_cover",
"edit_port",
"remove_port",
],
)
async def async_step_add_binary_sensor(self, user_input: dict | None = None):
if user_input is not None:
user_input, errors = _validate_knx_addresses(
user_input,
[
CONF_STATE_ADDRESS,
],
)
if errors:
return self.async_show_form(
step_id="add_binary_sensor",
data_schema=_binary_sensor_schema(),
errors=errors,
)
return await self._async_store_port("binary_sensor", user_input)
return self.async_show_form(
step_id="add_binary_sensor", data_schema=_binary_sensor_schema()
)
async def async_step_add_switch(self, user_input: dict | None = None):
if user_input is not None:
user_input, errors = _validate_knx_addresses(
user_input, [CONF_COMMAND_ADDRESS, CONF_STATE_ADDRESS]
)
if errors:
return self.async_show_form(
step_id="add_switch",
data_schema=_switch_schema(),
errors=errors,
)
return await self._async_store_port("switch", user_input)
return self.async_show_form(
step_id="add_switch", data_schema=_switch_schema()
)
async def async_step_add_cover(self, user_input: dict | None = None):
if user_input is not None:
user_input, errors = _validate_knx_addresses(
user_input,
[
CONF_MOVE_LONG_ADDRESS,
CONF_MOVE_SHORT_ADDRESS,
CONF_STOP_ADDRESS,
CONF_POSITION_ADDRESS,
CONF_POSITION_STATE_ADDRESS,
CONF_ANGLE_ADDRESS,
CONF_ANGLE_STATE_ADDRESS,
],
)
if errors:
return self.async_show_form(
step_id="add_cover", data_schema=_cover_schema(), errors=errors
)
return await self._async_store_port("cover", user_input)
return self.async_show_form(step_id="add_cover", data_schema=_cover_schema())
async def async_step_remove_port(self, user_input: dict | None = None):
ports = list(self._config_entry.options.get(CONF_PORTS, []))
if not ports:
return self.async_abort(reason="no_ports_to_remove")
if user_input is not None:
port_id = user_input[CONF_PORT_ID]
ports = [port for port in ports if port.get("id") != port_id]
return self.async_create_entry(title="", data={CONF_PORTS: ports})
options = [
{
"value": port.get("id"),
"label": port.get("title", port.get("id")),
}
for port in ports
if port.get("id")
]
schema = vol.Schema(
{
vol.Required(CONF_PORT_ID): selector.SelectSelector(
selector.SelectSelectorConfig(options=options, mode="dropdown")
)
}
)
return self.async_show_form(step_id="remove_port", data_schema=schema)
async def async_step_edit_port(self, user_input: dict | None = None):
ports = list(self._config_entry.options.get(CONF_PORTS, []))
if not ports:
return self.async_abort(reason="no_ports_to_edit")
if user_input is not None and CONF_PORT_ID in user_input:
port_id = user_input[CONF_PORT_ID]
port = next((item for item in ports if item.get("id") == port_id), None)
if port is None:
return self.async_abort(reason="no_ports_to_edit")
self.context["port_id"] = port_id
port_type = port.get("type")
data = port.get("data", {})
if port_type == "binary_sensor":
return self.async_show_form(
step_id="edit_binary_sensor",
data_schema=_binary_sensor_schema(defaults=data),
)
if port_type == "switch":
return self.async_show_form(
step_id="edit_switch",
data_schema=_switch_schema(defaults=data),
)
if port_type == "cover":
return self.async_show_form(
step_id="edit_cover",
data_schema=_cover_schema(defaults=data),
)
return self.async_abort(reason="no_ports_to_edit")
options = [
{
"value": port.get("id"),
"label": port.get("title", port.get("id")),
}
for port in ports
if port.get("id")
]
schema = vol.Schema(
{
vol.Required(CONF_PORT_ID): selector.SelectSelector(
selector.SelectSelectorConfig(options=options, mode="dropdown")
)
}
)
return self.async_show_form(step_id="edit_port", data_schema=schema)
async def async_step_edit_binary_sensor(self, user_input: dict | None = None):
return await self._async_edit_port(
"binary_sensor", user_input, _binary_sensor_schema
)
async def async_step_edit_switch(self, user_input: dict | None = None):
return await self._async_edit_port("switch", user_input, _switch_schema)
async def async_step_edit_cover(self, user_input: dict | None = None):
return await self._async_edit_port("cover", user_input, _cover_schema)
async def _async_store_port(self, port_type: str, user_input: dict):
ports = list(self._config_entry.options.get(CONF_PORTS, []))
title = _entity_title(self.hass, user_input[CONF_ENTITY_ID])
ports.append(
{
"id": uuid.uuid4().hex,
"type": port_type,
"title": title,
"data": user_input,
}
)
return self.async_create_entry(title="", data={CONF_PORTS: ports})
async def _async_edit_port(self, port_type: str, user_input, schema_factory):
ports = list(self._config_entry.options.get(CONF_PORTS, []))
port_id = self.context.get("port_id")
if port_id is None:
return self.async_abort(reason="no_ports_to_edit")
port = next((item for item in ports if item.get("id") == port_id), None)
if port is None:
return self.async_abort(reason="no_ports_to_edit")
if user_input is None:
return self.async_show_form(
step_id=f"edit_{port_type}",
data_schema=schema_factory(defaults=port.get("data", {})),
)
user_input, errors = _validate_knx_addresses(user_input, _port_keys(port_type))
if errors:
return self.async_show_form(
step_id=f"edit_{port_type}",
data_schema=schema_factory(defaults=user_input),
errors=errors,
)
port["data"] = user_input
port["title"] = _entity_title(self.hass, user_input[CONF_ENTITY_ID])
return self.async_create_entry(title="", data={CONF_PORTS: ports})
class BinarySensorPortSubentryFlow(config_entries.ConfigSubentryFlow):
VERSION = 1
async def async_step_user(self, user_input: dict | None = None):
if user_input is not None:
user_input, errors = _validate_knx_addresses(
user_input,
[
CONF_STATE_ADDRESS,
],
)
if errors:
return self.async_show_form(
step_id="user", data_schema=_binary_sensor_schema(), errors=errors
)
title = _entity_title(self.hass, user_input[CONF_ENTITY_ID])
return self.async_create_entry(title=title, data=user_input)
return self.async_show_form(step_id="user", data_schema=_binary_sensor_schema())
class SwitchPortSubentryFlow(config_entries.ConfigSubentryFlow):
VERSION = 1
async def async_step_user(self, user_input: dict | None = None):
if user_input is not None:
user_input, errors = _validate_knx_addresses(
user_input, [CONF_COMMAND_ADDRESS, CONF_STATE_ADDRESS]
)
if errors:
return self.async_show_form(
step_id="user", data_schema=_switch_schema(), errors=errors
)
title = _entity_title(self.hass, user_input[CONF_ENTITY_ID])
return self.async_create_entry(title=title, data=user_input)
return self.async_show_form(step_id="user", data_schema=_switch_schema())
class CoverPortSubentryFlow(config_entries.ConfigSubentryFlow):
VERSION = 1
async def async_step_user(self, user_input: dict | None = None):
if user_input is not None:
user_input, errors = _validate_knx_addresses(
user_input,
[
CONF_MOVE_LONG_ADDRESS,
CONF_MOVE_SHORT_ADDRESS,
CONF_STOP_ADDRESS,
CONF_POSITION_ADDRESS,
CONF_POSITION_STATE_ADDRESS,
CONF_ANGLE_ADDRESS,
CONF_ANGLE_STATE_ADDRESS,
],
)
if errors:
return self.async_show_form(
step_id="user", data_schema=_cover_schema(), errors=errors
)
title = _entity_title(self.hass, user_input[CONF_ENTITY_ID])
return self.async_create_entry(title=title, data=user_input)
return self.async_show_form(step_id="user", data_schema=_cover_schema())
def _entity_title(hass, entity_id: str) -> str:
state = hass.states.get(entity_id)
if state is None:
return entity_id
return state.attributes.get("friendly_name", entity_id)
def _binary_sensor_schema(defaults: dict | None = None) -> vol.Schema:
defaults = defaults or {}
return vol.Schema(
{
vol.Required(
CONF_ENTITY_ID,
default=defaults.get(CONF_ENTITY_ID, ""),
): selector.EntitySelector(
selector.EntitySelectorConfig(domain=["binary_sensor"])
),
vol.Optional(
CONF_STATE_ADDRESS,
default=defaults.get(CONF_STATE_ADDRESS, ""),
): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(
_invert_in_key(CONF_STATE_ADDRESS),
default=bool(defaults.get(_invert_in_key(CONF_STATE_ADDRESS))),
): (
selector.BooleanSelector()
),
vol.Optional(
_invert_out_key(CONF_STATE_ADDRESS),
default=bool(defaults.get(_invert_out_key(CONF_STATE_ADDRESS))),
): (
selector.BooleanSelector()
),
}
)
def _cover_schema(defaults: dict | None = None) -> vol.Schema:
defaults = defaults or {}
return vol.Schema(
{
vol.Required(
CONF_ENTITY_ID,
default=defaults.get(CONF_ENTITY_ID, ""),
): selector.EntitySelector(
selector.EntitySelectorConfig(domain=["cover"])
),
vol.Optional(
CONF_MOVE_LONG_ADDRESS,
default=defaults.get(CONF_MOVE_LONG_ADDRESS, ""),
): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(
_invert_in_key(CONF_MOVE_LONG_ADDRESS),
default=bool(defaults.get(_invert_in_key(CONF_MOVE_LONG_ADDRESS))),
): (
selector.BooleanSelector()
),
vol.Optional(
_invert_out_key(CONF_MOVE_LONG_ADDRESS),
default=bool(defaults.get(_invert_out_key(CONF_MOVE_LONG_ADDRESS))),
): (
selector.BooleanSelector()
),
vol.Optional(
CONF_MOVE_SHORT_ADDRESS,
default=defaults.get(CONF_MOVE_SHORT_ADDRESS, ""),
): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(
_invert_in_key(CONF_MOVE_SHORT_ADDRESS),
default=bool(defaults.get(_invert_in_key(CONF_MOVE_SHORT_ADDRESS))),
): (
selector.BooleanSelector()
),
vol.Optional(
_invert_out_key(CONF_MOVE_SHORT_ADDRESS),
default=bool(defaults.get(_invert_out_key(CONF_MOVE_SHORT_ADDRESS))),
): (
selector.BooleanSelector()
),
vol.Optional(
CONF_STOP_ADDRESS,
default=defaults.get(CONF_STOP_ADDRESS, ""),
): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(
_invert_in_key(CONF_STOP_ADDRESS),
default=bool(defaults.get(_invert_in_key(CONF_STOP_ADDRESS))),
): (
selector.BooleanSelector()
),
vol.Optional(
_invert_out_key(CONF_STOP_ADDRESS),
default=bool(defaults.get(_invert_out_key(CONF_STOP_ADDRESS))),
): (
selector.BooleanSelector()
),
vol.Optional(
CONF_POSITION_ADDRESS,
default=defaults.get(CONF_POSITION_ADDRESS, ""),
): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(
_invert_in_key(CONF_POSITION_ADDRESS),
default=bool(defaults.get(_invert_in_key(CONF_POSITION_ADDRESS))),
): (
selector.BooleanSelector()
),
vol.Optional(
_invert_out_key(CONF_POSITION_ADDRESS),
default=bool(defaults.get(_invert_out_key(CONF_POSITION_ADDRESS))),
): (
selector.BooleanSelector()
),
vol.Optional(
CONF_POSITION_STATE_ADDRESS,
default=defaults.get(CONF_POSITION_STATE_ADDRESS, ""),
): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(
_invert_in_key(CONF_POSITION_STATE_ADDRESS),
default=bool(defaults.get(_invert_in_key(CONF_POSITION_STATE_ADDRESS))),
): (
selector.BooleanSelector()
),
vol.Optional(
_invert_out_key(CONF_POSITION_STATE_ADDRESS),
default=bool(defaults.get(_invert_out_key(CONF_POSITION_STATE_ADDRESS))),
): (
selector.BooleanSelector()
),
vol.Optional(
CONF_ANGLE_ADDRESS,
default=defaults.get(CONF_ANGLE_ADDRESS, ""),
): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(
_invert_in_key(CONF_ANGLE_ADDRESS),
default=bool(defaults.get(_invert_in_key(CONF_ANGLE_ADDRESS))),
): (
selector.BooleanSelector()
),
vol.Optional(
_invert_out_key(CONF_ANGLE_ADDRESS),
default=bool(defaults.get(_invert_out_key(CONF_ANGLE_ADDRESS))),
): (
selector.BooleanSelector()
),
vol.Optional(
CONF_ANGLE_STATE_ADDRESS,
default=defaults.get(CONF_ANGLE_STATE_ADDRESS, ""),
): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(
_invert_in_key(CONF_ANGLE_STATE_ADDRESS),
default=bool(defaults.get(_invert_in_key(CONF_ANGLE_STATE_ADDRESS))),
): (
selector.BooleanSelector()
),
vol.Optional(
_invert_out_key(CONF_ANGLE_STATE_ADDRESS),
default=bool(defaults.get(_invert_out_key(CONF_ANGLE_STATE_ADDRESS))),
): (
selector.BooleanSelector()
),
}
)
def _switch_schema(defaults: dict | None = None) -> vol.Schema:
defaults = defaults or {}
return vol.Schema(
{
vol.Required(
CONF_ENTITY_ID,
default=defaults.get(CONF_ENTITY_ID, ""),
): selector.EntitySelector(
selector.EntitySelectorConfig(domain=["switch"])
),
vol.Optional(
CONF_COMMAND_ADDRESS,
default=defaults.get(CONF_COMMAND_ADDRESS, ""),
): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(
_invert_in_key(CONF_COMMAND_ADDRESS),
default=bool(defaults.get(_invert_in_key(CONF_COMMAND_ADDRESS))),
): (
selector.BooleanSelector()
),
vol.Optional(
_invert_out_key(CONF_COMMAND_ADDRESS),
default=bool(defaults.get(_invert_out_key(CONF_COMMAND_ADDRESS))),
): (
selector.BooleanSelector()
),
vol.Optional(
CONF_STATE_ADDRESS,
default=defaults.get(CONF_STATE_ADDRESS, ""),
): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(
_invert_in_key(CONF_STATE_ADDRESS),
default=bool(defaults.get(_invert_in_key(CONF_STATE_ADDRESS))),
): (
selector.BooleanSelector()
),
vol.Optional(
_invert_out_key(CONF_STATE_ADDRESS),
default=bool(defaults.get(_invert_out_key(CONF_STATE_ADDRESS))),
): (
selector.BooleanSelector()
),
}
)
def _port_keys(port_type: str) -> list[str]:
if port_type == "binary_sensor":
return [CONF_STATE_ADDRESS]
if port_type == "switch":
return [CONF_COMMAND_ADDRESS, CONF_STATE_ADDRESS]
if port_type == "cover":
return [
CONF_MOVE_LONG_ADDRESS,
CONF_MOVE_SHORT_ADDRESS,
CONF_STOP_ADDRESS,
CONF_POSITION_ADDRESS,
CONF_POSITION_STATE_ADDRESS,
CONF_ANGLE_ADDRESS,
CONF_ANGLE_STATE_ADDRESS,
]
return []
def _validate_knx_addresses(
user_input: dict, keys: list[str]
) -> tuple[dict, dict[str, str]]:
errors: dict[str, str] = {}
cleaned = dict(user_input)
for key in keys:
if key not in cleaned:
continue
value = cleaned.get(key)
if value is None:
cleaned.pop(key, None)
cleaned.pop(_invert_in_key(key), None)
cleaned.pop(_invert_out_key(key), None)
continue
try:
normalized = _normalize_group_address(str(value))
except ValueError:
errors[key] = "invalid_ga"
continue
if normalized == "":
cleaned.pop(key, None)
cleaned.pop(_invert_in_key(key), None)
cleaned.pop(_invert_out_key(key), None)
else:
cleaned[key] = normalized
return cleaned, errors
def _normalize_group_address(value: str) -> str:
text = value.strip()
if not text:
return ""
parts = text.split("/")
if len(parts) == 2:
main, sub = _parse_int(parts[0]), _parse_int(parts[1])
if not (0 <= main <= 31 and 0 <= sub <= 2047):
raise ValueError("group address out of range")
return f"{main}/{sub}"
if len(parts) == 3:
main, middle, sub = (
_parse_int(parts[0]),
_parse_int(parts[1]),
_parse_int(parts[2]),
)
if not (0 <= main <= 31 and 0 <= middle <= 7 and 0 <= sub <= 255):
raise ValueError("group address out of range")
return f"{main}/{middle}/{sub}"
raise ValueError("invalid group address format")
def _parse_int(value: str) -> int:
text = value.strip()
if text == "":
raise ValueError("empty group address part")
return int(text)
def _invert_in_key(address_key: str) -> str:
return f"{address_key}_{CONF_INVERT_INCOMING}"
def _invert_out_key(address_key: str) -> str:
return f"{address_key}_{CONF_INVERT_OUTGOING}"
def _get_subentry_type():
candidates = [
"SubentryType",
"ConfigEntrySubentryType",
"ConfigSubentryType",
"SubEntryType",
]
for name in candidates:
subentry_type = getattr(config_entries, name, None)
if subentry_type is not None:
return subentry_type
_LOGGER.debug(
"Subentry type class not found on homeassistant.config_entries. "
"Available attrs: %s",
", ".join(sorted(dir(config_entries))),
)
return None