Files
HA-KNX-Bridge/custom_components/ha_knx_bridge/config_flow.py

374 lines
13 KiB
Python

from __future__ import annotations
import logging
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_ENTITY_ID
from homeassistant.core import callback
from homeassistant.helpers import selector
from .const import (
CONF_ANGLE_ADDRESS,
CONF_ANGLE_STATE_ADDRESS,
CONF_COMMAND_ADDRESS,
CONF_INVERT_INCOMING,
CONF_INVERT_OUTGOING,
CONF_KNX_ENTRY_ID,
CONF_MOVE_LONG_ADDRESS,
CONF_MOVE_SHORT_ADDRESS,
CONF_POSITION_ADDRESS,
CONF_POSITION_STATE_ADDRESS,
CONF_STATE_ADDRESS,
CONF_STOP_ADDRESS,
DOMAIN,
)
_LOGGER = logging.getLogger(__name__)
KNX_DOMAIN = "knx"
class HAKnxBridgeConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1
async def async_step_user(self, user_input: dict | None = None):
knx_entries = self.hass.config_entries.async_entries(KNX_DOMAIN)
if not knx_entries:
return self.async_abort(reason="knx_not_configured")
if user_input is not None:
return self.async_create_entry(
title="HA KNX Bridge",
data={CONF_KNX_ENTRY_ID: user_input[CONF_KNX_ENTRY_ID]},
)
options = [
{"value": entry.entry_id, "label": entry.title or entry.entry_id}
for entry in knx_entries
]
schema = vol.Schema(
{
vol.Required(CONF_KNX_ENTRY_ID): selector.SelectSelector(
selector.SelectSelectorConfig(options=options, mode="dropdown")
)
}
)
return self.async_show_form(step_id="user", data_schema=schema)
@staticmethod
@callback
def async_get_options_flow(config_entry):
return HAKnxBridgeOptionsFlow(config_entry)
@staticmethod
@callback
def async_get_supported_subentry_types(config_entry):
subentry_type = _get_subentry_type()
if subentry_type is None:
_LOGGER.warning(
"Config subentries are not supported in this Home Assistant version"
)
return {}
return {
"binary_sensor": subentry_type(
name="Binary Sensor Port", flow_class=BinarySensorPortSubentryFlow
),
"switch": subentry_type(
name="Switch Port", flow_class=SwitchPortSubentryFlow
),
"cover": subentry_type(
name="Cover Port", flow_class=CoverPortSubentryFlow
),
}
class HAKnxBridgeOptionsFlow(config_entries.OptionsFlow):
def __init__(self, config_entry: config_entries.ConfigEntry) -> None:
self._config_entry = config_entry
async def async_step_init(self, user_input: dict | None = None):
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
schema = vol.Schema({})
return self.async_show_form(step_id="init", data_schema=schema)
class BinarySensorPortSubentryFlow(config_entries.ConfigSubentryFlow):
VERSION = 1
async def async_step_user(self, user_input: dict | None = None):
if user_input is not None:
user_input, errors = _validate_knx_addresses(
user_input,
[
CONF_STATE_ADDRESS,
],
)
if errors:
return self.async_show_form(
step_id="user", data_schema=_binary_sensor_schema(), errors=errors
)
title = _entity_title(self.hass, user_input[CONF_ENTITY_ID])
return self.async_create_entry(title=title, data=user_input)
return self.async_show_form(step_id="user", data_schema=_binary_sensor_schema())
class SwitchPortSubentryFlow(config_entries.ConfigSubentryFlow):
VERSION = 1
async def async_step_user(self, user_input: dict | None = None):
if user_input is not None:
user_input, errors = _validate_knx_addresses(
user_input, [CONF_COMMAND_ADDRESS, CONF_STATE_ADDRESS]
)
if errors:
return self.async_show_form(
step_id="user", data_schema=_switch_schema(), errors=errors
)
title = _entity_title(self.hass, user_input[CONF_ENTITY_ID])
return self.async_create_entry(title=title, data=user_input)
return self.async_show_form(step_id="user", data_schema=_switch_schema())
class CoverPortSubentryFlow(config_entries.ConfigSubentryFlow):
VERSION = 1
async def async_step_user(self, user_input: dict | None = None):
if user_input is not None:
user_input, errors = _validate_knx_addresses(
user_input,
[
CONF_MOVE_LONG_ADDRESS,
CONF_MOVE_SHORT_ADDRESS,
CONF_STOP_ADDRESS,
CONF_POSITION_ADDRESS,
CONF_POSITION_STATE_ADDRESS,
CONF_ANGLE_ADDRESS,
CONF_ANGLE_STATE_ADDRESS,
],
)
if errors:
return self.async_show_form(
step_id="user", data_schema=_cover_schema(), errors=errors
)
title = _entity_title(self.hass, user_input[CONF_ENTITY_ID])
return self.async_create_entry(title=title, data=user_input)
return self.async_show_form(step_id="user", data_schema=_cover_schema())
def _entity_title(hass, entity_id: str) -> str:
state = hass.states.get(entity_id)
if state is None:
return entity_id
return state.attributes.get("friendly_name", entity_id)
def _binary_sensor_schema() -> vol.Schema:
return vol.Schema(
{
vol.Required(CONF_ENTITY_ID): selector.EntitySelector(
selector.EntitySelectorConfig(domain=["binary_sensor"])
),
vol.Optional(CONF_STATE_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(_invert_in_key(CONF_STATE_ADDRESS), default=False): (
selector.BooleanSelector()
),
vol.Optional(_invert_out_key(CONF_STATE_ADDRESS), default=False): (
selector.BooleanSelector()
),
}
)
def _cover_schema() -> vol.Schema:
return vol.Schema(
{
vol.Required(CONF_ENTITY_ID): selector.EntitySelector(
selector.EntitySelectorConfig(domain=["cover"])
),
vol.Optional(CONF_MOVE_LONG_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(_invert_in_key(CONF_MOVE_LONG_ADDRESS), default=False): (
selector.BooleanSelector()
),
vol.Optional(_invert_out_key(CONF_MOVE_LONG_ADDRESS), default=False): (
selector.BooleanSelector()
),
vol.Optional(CONF_MOVE_SHORT_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(_invert_in_key(CONF_MOVE_SHORT_ADDRESS), default=False): (
selector.BooleanSelector()
),
vol.Optional(_invert_out_key(CONF_MOVE_SHORT_ADDRESS), default=False): (
selector.BooleanSelector()
),
vol.Optional(CONF_STOP_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(_invert_in_key(CONF_STOP_ADDRESS), default=False): (
selector.BooleanSelector()
),
vol.Optional(_invert_out_key(CONF_STOP_ADDRESS), default=False): (
selector.BooleanSelector()
),
vol.Optional(CONF_POSITION_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(_invert_in_key(CONF_POSITION_ADDRESS), default=False): (
selector.BooleanSelector()
),
vol.Optional(_invert_out_key(CONF_POSITION_ADDRESS), default=False): (
selector.BooleanSelector()
),
vol.Optional(CONF_POSITION_STATE_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(_invert_in_key(CONF_POSITION_STATE_ADDRESS), default=False): (
selector.BooleanSelector()
),
vol.Optional(_invert_out_key(CONF_POSITION_STATE_ADDRESS), default=False): (
selector.BooleanSelector()
),
vol.Optional(CONF_ANGLE_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(_invert_in_key(CONF_ANGLE_ADDRESS), default=False): (
selector.BooleanSelector()
),
vol.Optional(_invert_out_key(CONF_ANGLE_ADDRESS), default=False): (
selector.BooleanSelector()
),
vol.Optional(CONF_ANGLE_STATE_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(_invert_in_key(CONF_ANGLE_STATE_ADDRESS), default=False): (
selector.BooleanSelector()
),
vol.Optional(_invert_out_key(CONF_ANGLE_STATE_ADDRESS), default=False): (
selector.BooleanSelector()
),
}
)
def _switch_schema() -> vol.Schema:
return vol.Schema(
{
vol.Required(CONF_ENTITY_ID): selector.EntitySelector(
selector.EntitySelectorConfig(domain=["switch"])
),
vol.Optional(CONF_COMMAND_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(_invert_in_key(CONF_COMMAND_ADDRESS), default=False): (
selector.BooleanSelector()
),
vol.Optional(_invert_out_key(CONF_COMMAND_ADDRESS), default=False): (
selector.BooleanSelector()
),
vol.Optional(CONF_STATE_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(_invert_in_key(CONF_STATE_ADDRESS), default=False): (
selector.BooleanSelector()
),
vol.Optional(_invert_out_key(CONF_STATE_ADDRESS), default=False): (
selector.BooleanSelector()
),
}
)
def _validate_knx_addresses(
user_input: dict, keys: list[str]
) -> tuple[dict, dict[str, str]]:
errors: dict[str, str] = {}
cleaned = dict(user_input)
for key in keys:
if key not in cleaned:
continue
value = cleaned.get(key)
if value is None:
cleaned.pop(key, None)
cleaned.pop(_invert_in_key(key), None)
cleaned.pop(_invert_out_key(key), None)
continue
try:
normalized = _normalize_group_address(str(value))
except ValueError:
errors[key] = "invalid_ga"
continue
if normalized == "":
cleaned.pop(key, None)
cleaned.pop(_invert_in_key(key), None)
cleaned.pop(_invert_out_key(key), None)
else:
cleaned[key] = normalized
return cleaned, errors
def _normalize_group_address(value: str) -> str:
text = value.strip()
if not text:
return ""
parts = text.split("/")
if len(parts) == 2:
main, sub = _parse_int(parts[0]), _parse_int(parts[1])
if not (0 <= main <= 31 and 0 <= sub <= 2047):
raise ValueError("group address out of range")
return f"{main}/{sub}"
if len(parts) == 3:
main, middle, sub = (
_parse_int(parts[0]),
_parse_int(parts[1]),
_parse_int(parts[2]),
)
if not (0 <= main <= 31 and 0 <= middle <= 7 and 0 <= sub <= 255):
raise ValueError("group address out of range")
return f"{main}/{middle}/{sub}"
raise ValueError("invalid group address format")
def _parse_int(value: str) -> int:
text = value.strip()
if text == "":
raise ValueError("empty group address part")
return int(text)
def _invert_in_key(address_key: str) -> str:
return f"{address_key}_{CONF_INVERT_INCOMING}"
def _invert_out_key(address_key: str) -> str:
return f"{address_key}_{CONF_INVERT_OUTGOING}"
def _get_subentry_type():
candidates = [
"SubentryType",
"ConfigEntrySubentryType",
"ConfigSubentryType",
"SubEntryType",
]
for name in candidates:
subentry_type = getattr(config_entries, name, None)
if subentry_type is not None:
return subentry_type
_LOGGER.debug(
"Subentry type class not found on homeassistant.config_entries. "
"Available attrs: %s",
", ".join(sorted(dir(config_entries))),
)
return None