from __future__ import annotations import logging import voluptuous as vol from homeassistant import config_entries from homeassistant.const import CONF_ENTITY_ID from homeassistant.core import callback from homeassistant.helpers import selector from .const import ( CONF_ANGLE_ADDRESS, CONF_ANGLE_STATE_ADDRESS, CONF_COMMAND_ADDRESS, CONF_INVERT_INCOMING, CONF_INVERT_OUTGOING, CONF_KNX_ENTRY_ID, CONF_MOVE_LONG_ADDRESS, CONF_MOVE_SHORT_ADDRESS, CONF_POSITION_ADDRESS, CONF_POSITION_STATE_ADDRESS, CONF_STATE_ADDRESS, CONF_STOP_ADDRESS, DOMAIN, ) _LOGGER = logging.getLogger(__name__) KNX_DOMAIN = "knx" _SUBENTRY_TYPE = getattr( config_entries, "SubentryType", getattr(config_entries, "ConfigEntrySubentryType", None) ) class HAKnxBridgeConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): VERSION = 1 async def async_step_user(self, user_input: dict | None = None): knx_entries = self.hass.config_entries.async_entries(KNX_DOMAIN) if not knx_entries: return self.async_abort(reason="knx_not_configured") if user_input is not None: return self.async_create_entry( title="HA KNX Bridge", data={CONF_KNX_ENTRY_ID: user_input[CONF_KNX_ENTRY_ID]}, ) options = [ {"value": entry.entry_id, "label": entry.title or entry.entry_id} for entry in knx_entries ] schema = vol.Schema( { vol.Required(CONF_KNX_ENTRY_ID): selector.SelectSelector( selector.SelectSelectorConfig(options=options, mode="dropdown") ) } ) return self.async_show_form(step_id="user", data_schema=schema) @staticmethod @callback def async_get_options_flow(config_entry): return HAKnxBridgeOptionsFlow(config_entry) @staticmethod @callback def async_get_supported_subentry_types(config_entry): if _SUBENTRY_TYPE is None: _LOGGER.warning( "Config subentries are not supported in this Home Assistant version" ) return {} return { "binary_sensor": _SUBENTRY_TYPE( name="Binary Sensor Port", flow_class=BinarySensorPortSubentryFlow ), "switch": _SUBENTRY_TYPE( name="Switch Port", flow_class=SwitchPortSubentryFlow ), "cover": _SUBENTRY_TYPE( name="Cover Port", flow_class=CoverPortSubentryFlow ), } class HAKnxBridgeOptionsFlow(config_entries.OptionsFlow): def __init__(self, config_entry: config_entries.ConfigEntry) -> None: self._config_entry = config_entry async def async_step_init(self, user_input: dict | None = None): if user_input is not None: return self.async_create_entry(title="", data=user_input) schema = vol.Schema({}) return self.async_show_form(step_id="init", data_schema=schema) class BinarySensorPortSubentryFlow(config_entries.ConfigSubentryFlow): VERSION = 1 async def async_step_user(self, user_input: dict | None = None): if user_input is not None: user_input, errors = _validate_knx_addresses( user_input, [ CONF_STATE_ADDRESS, ], ) if errors: return self.async_show_form( step_id="user", data_schema=_binary_sensor_schema(), errors=errors ) title = _entity_title(self.hass, user_input[CONF_ENTITY_ID]) return self.async_create_entry(title=title, data=user_input) return self.async_show_form(step_id="user", data_schema=_binary_sensor_schema()) class SwitchPortSubentryFlow(config_entries.ConfigSubentryFlow): VERSION = 1 async def async_step_user(self, user_input: dict | None = None): if user_input is not None: user_input, errors = _validate_knx_addresses( user_input, [CONF_COMMAND_ADDRESS, CONF_STATE_ADDRESS] ) if errors: return self.async_show_form( step_id="user", data_schema=_switch_schema(), errors=errors ) title = _entity_title(self.hass, user_input[CONF_ENTITY_ID]) return self.async_create_entry(title=title, data=user_input) return self.async_show_form(step_id="user", data_schema=_switch_schema()) class CoverPortSubentryFlow(config_entries.ConfigSubentryFlow): VERSION = 1 async def async_step_user(self, user_input: dict | None = None): if user_input is not None: user_input, errors = _validate_knx_addresses( user_input, [ CONF_MOVE_LONG_ADDRESS, CONF_MOVE_SHORT_ADDRESS, CONF_STOP_ADDRESS, CONF_POSITION_ADDRESS, CONF_POSITION_STATE_ADDRESS, CONF_ANGLE_ADDRESS, CONF_ANGLE_STATE_ADDRESS, ], ) if errors: return self.async_show_form( step_id="user", data_schema=_cover_schema(), errors=errors ) title = _entity_title(self.hass, user_input[CONF_ENTITY_ID]) return self.async_create_entry(title=title, data=user_input) return self.async_show_form(step_id="user", data_schema=_cover_schema()) def _entity_title(hass, entity_id: str) -> str: state = hass.states.get(entity_id) if state is None: return entity_id return state.attributes.get("friendly_name", entity_id) def _binary_sensor_schema() -> vol.Schema: return vol.Schema( { vol.Required(CONF_ENTITY_ID): selector.EntitySelector( selector.EntitySelectorConfig(domain=["binary_sensor"]) ), vol.Optional(CONF_STATE_ADDRESS): selector.TextSelector( selector.TextSelectorConfig(type="text") ), vol.Optional(_invert_in_key(CONF_STATE_ADDRESS), default=False): ( selector.BooleanSelector() ), vol.Optional(_invert_out_key(CONF_STATE_ADDRESS), default=False): ( selector.BooleanSelector() ), } ) def _cover_schema() -> vol.Schema: return vol.Schema( { vol.Required(CONF_ENTITY_ID): selector.EntitySelector( selector.EntitySelectorConfig(domain=["cover"]) ), vol.Optional(CONF_MOVE_LONG_ADDRESS): selector.TextSelector( selector.TextSelectorConfig(type="text") ), vol.Optional(_invert_in_key(CONF_MOVE_LONG_ADDRESS), default=False): ( selector.BooleanSelector() ), vol.Optional(_invert_out_key(CONF_MOVE_LONG_ADDRESS), default=False): ( selector.BooleanSelector() ), vol.Optional(CONF_MOVE_SHORT_ADDRESS): selector.TextSelector( selector.TextSelectorConfig(type="text") ), vol.Optional(_invert_in_key(CONF_MOVE_SHORT_ADDRESS), default=False): ( selector.BooleanSelector() ), vol.Optional(_invert_out_key(CONF_MOVE_SHORT_ADDRESS), default=False): ( selector.BooleanSelector() ), vol.Optional(CONF_STOP_ADDRESS): selector.TextSelector( selector.TextSelectorConfig(type="text") ), vol.Optional(_invert_in_key(CONF_STOP_ADDRESS), default=False): ( selector.BooleanSelector() ), vol.Optional(_invert_out_key(CONF_STOP_ADDRESS), default=False): ( selector.BooleanSelector() ), vol.Optional(CONF_POSITION_ADDRESS): selector.TextSelector( selector.TextSelectorConfig(type="text") ), vol.Optional(_invert_in_key(CONF_POSITION_ADDRESS), default=False): ( selector.BooleanSelector() ), vol.Optional(_invert_out_key(CONF_POSITION_ADDRESS), default=False): ( selector.BooleanSelector() ), vol.Optional(CONF_POSITION_STATE_ADDRESS): selector.TextSelector( selector.TextSelectorConfig(type="text") ), vol.Optional(_invert_in_key(CONF_POSITION_STATE_ADDRESS), default=False): ( selector.BooleanSelector() ), vol.Optional(_invert_out_key(CONF_POSITION_STATE_ADDRESS), default=False): ( selector.BooleanSelector() ), vol.Optional(CONF_ANGLE_ADDRESS): selector.TextSelector( selector.TextSelectorConfig(type="text") ), vol.Optional(_invert_in_key(CONF_ANGLE_ADDRESS), default=False): ( selector.BooleanSelector() ), vol.Optional(_invert_out_key(CONF_ANGLE_ADDRESS), default=False): ( selector.BooleanSelector() ), vol.Optional(CONF_ANGLE_STATE_ADDRESS): selector.TextSelector( selector.TextSelectorConfig(type="text") ), vol.Optional(_invert_in_key(CONF_ANGLE_STATE_ADDRESS), default=False): ( selector.BooleanSelector() ), vol.Optional(_invert_out_key(CONF_ANGLE_STATE_ADDRESS), default=False): ( selector.BooleanSelector() ), } ) def _switch_schema() -> vol.Schema: return vol.Schema( { vol.Required(CONF_ENTITY_ID): selector.EntitySelector( selector.EntitySelectorConfig(domain=["switch"]) ), vol.Optional(CONF_COMMAND_ADDRESS): selector.TextSelector( selector.TextSelectorConfig(type="text") ), vol.Optional(_invert_in_key(CONF_COMMAND_ADDRESS), default=False): ( selector.BooleanSelector() ), vol.Optional(_invert_out_key(CONF_COMMAND_ADDRESS), default=False): ( selector.BooleanSelector() ), vol.Optional(CONF_STATE_ADDRESS): selector.TextSelector( selector.TextSelectorConfig(type="text") ), vol.Optional(_invert_in_key(CONF_STATE_ADDRESS), default=False): ( selector.BooleanSelector() ), vol.Optional(_invert_out_key(CONF_STATE_ADDRESS), default=False): ( selector.BooleanSelector() ), } ) def _validate_knx_addresses( user_input: dict, keys: list[str] ) -> tuple[dict, dict[str, str]]: errors: dict[str, str] = {} cleaned = dict(user_input) for key in keys: if key not in cleaned: continue value = cleaned.get(key) if value is None: cleaned.pop(key, None) cleaned.pop(_invert_in_key(key), None) cleaned.pop(_invert_out_key(key), None) continue try: normalized = _normalize_group_address(str(value)) except ValueError: errors[key] = "invalid_ga" continue if normalized == "": cleaned.pop(key, None) cleaned.pop(_invert_in_key(key), None) cleaned.pop(_invert_out_key(key), None) else: cleaned[key] = normalized return cleaned, errors def _normalize_group_address(value: str) -> str: text = value.strip() if not text: return "" parts = text.split("/") if len(parts) == 2: main, sub = _parse_int(parts[0]), _parse_int(parts[1]) if not (0 <= main <= 31 and 0 <= sub <= 2047): raise ValueError("group address out of range") return f"{main}/{sub}" if len(parts) == 3: main, middle, sub = ( _parse_int(parts[0]), _parse_int(parts[1]), _parse_int(parts[2]), ) if not (0 <= main <= 31 and 0 <= middle <= 7 and 0 <= sub <= 255): raise ValueError("group address out of range") return f"{main}/{middle}/{sub}" raise ValueError("invalid group address format") def _parse_int(value: str) -> int: text = value.strip() if text == "": raise ValueError("empty group address part") return int(text) def _invert_in_key(address_key: str) -> str: return f"{address_key}_{CONF_INVERT_INCOMING}" def _invert_out_key(address_key: str) -> str: return f"{address_key}_{CONF_INVERT_OUTGOING}"