from __future__ import annotations from dataclasses import dataclass import logging from typing import Any from homeassistant.config_entries import ConfigEntry from homeassistant.core import Event, HomeAssistant, State from homeassistant.exceptions import ConfigEntryNotReady from homeassistant.helpers import event as event_helper from .const import ( CONF_ANGLE_ADDRESS, CONF_ANGLE_STATE_ADDRESS, ADDRESS_VALUE_TYPE, CONF_COMMAND_ADDRESS, CONF_INVERT_INCOMING, CONF_INVERT_OUTGOING, CONF_MOVE_LONG_ADDRESS, CONF_MOVE_SHORT_ADDRESS, CONF_PORTS, CONF_POSITION_ADDRESS, CONF_POSITION_STATE_ADDRESS, CONF_STATE_ADDRESS, CONF_STOP_ADDRESS, ) _LOGGER = logging.getLogger(__name__) KNX_DOMAIN = "knx" @dataclass(frozen=True) class BinarySensorPort: entity_id: str state_address: str | None state_invert_incoming: bool state_invert_outgoing: bool @dataclass(frozen=True) class CoverPort: entity_id: str move_long_address: str | None move_long_invert_incoming: bool move_long_invert_outgoing: bool move_short_address: str | None move_short_invert_incoming: bool move_short_invert_outgoing: bool stop_address: str | None stop_invert_incoming: bool stop_invert_outgoing: bool position_address: str | None position_invert_incoming: bool position_invert_outgoing: bool position_state_address: str | None position_state_invert_incoming: bool position_state_invert_outgoing: bool angle_address: str | None angle_invert_incoming: bool angle_invert_outgoing: bool angle_state_address: str | None angle_state_invert_incoming: bool angle_state_invert_outgoing: bool @dataclass(frozen=True) class SwitchPort: entity_id: str command_address: str | None state_address: str | None command_invert_incoming: bool command_invert_outgoing: bool state_invert_incoming: bool state_invert_outgoing: bool @dataclass(frozen=True) class AddressOptions: value_type: str | None invert_incoming: bool invert_outgoing: bool class BridgeManager: def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None: self.hass = hass self.entry = entry self._unsub_listeners: list[callable] = [] self._knx_event_unsub: callable | None = None self._address_handlers: dict[str, callable[[Event], Any]] = {} self._registered_addresses: list[tuple[str, str | None]] = [] self._address_options: dict[str, AddressOptions] = {} async def async_setup(self) -> None: if not self.hass.services.has_service(KNX_DOMAIN, "send"): raise ConfigEntryNotReady("KNX integration services not available") binary_ports, switch_ports, cover_ports = self._load_ports() self._register_outgoing(binary_ports, switch_ports, cover_ports) await self._register_incoming(switch_ports, cover_ports) async def async_unload(self) -> None: for unsub in self._unsub_listeners: unsub() self._unsub_listeners.clear() if self._knx_event_unsub is not None: self._knx_event_unsub() self._knx_event_unsub = None await self._unregister_knx_events() def _load_ports( self, ) -> tuple[list[BinarySensorPort], list[SwitchPort], list[CoverPort]]: binary_ports: list[BinarySensorPort] = [] switch_ports: list[SwitchPort] = [] cover_ports: list[CoverPort] = [] for port_type, data in _iter_port_configs(self.entry): if port_type == "binary_sensor": binary_ports.append( BinarySensorPort( entity_id=data["entity_id"], state_address=_clean_address(data.get(CONF_STATE_ADDRESS)), state_invert_incoming=_clean_bool( data.get(_invert_in_key(CONF_STATE_ADDRESS)) ), state_invert_outgoing=_clean_bool( data.get(_invert_out_key(CONF_STATE_ADDRESS)) ), ) ) elif port_type == "switch": switch_ports.append( SwitchPort( entity_id=data["entity_id"], command_address=_clean_address( data.get(CONF_COMMAND_ADDRESS) ), state_address=_clean_address(data.get(CONF_STATE_ADDRESS)), command_invert_incoming=_clean_bool( data.get(_invert_in_key(CONF_COMMAND_ADDRESS)) ), command_invert_outgoing=_clean_bool( data.get(_invert_out_key(CONF_COMMAND_ADDRESS)) ), state_invert_incoming=_clean_bool( data.get(_invert_in_key(CONF_STATE_ADDRESS)) ), state_invert_outgoing=_clean_bool( data.get(_invert_out_key(CONF_STATE_ADDRESS)) ), ) ) elif port_type == "cover": cover_ports.append( CoverPort( entity_id=data["entity_id"], move_long_address=_clean_address(data.get(CONF_MOVE_LONG_ADDRESS)), move_long_invert_incoming=_clean_bool( data.get(_invert_in_key(CONF_MOVE_LONG_ADDRESS)) ), move_long_invert_outgoing=_clean_bool( data.get(_invert_out_key(CONF_MOVE_LONG_ADDRESS)) ), move_short_address=_clean_address(data.get(CONF_MOVE_SHORT_ADDRESS)), move_short_invert_incoming=_clean_bool( data.get(_invert_in_key(CONF_MOVE_SHORT_ADDRESS)) ), move_short_invert_outgoing=_clean_bool( data.get(_invert_out_key(CONF_MOVE_SHORT_ADDRESS)) ), stop_address=_clean_address(data.get(CONF_STOP_ADDRESS)), stop_invert_incoming=_clean_bool( data.get(_invert_in_key(CONF_STOP_ADDRESS)) ), stop_invert_outgoing=_clean_bool( data.get(_invert_out_key(CONF_STOP_ADDRESS)) ), position_address=_clean_address(data.get(CONF_POSITION_ADDRESS)), position_invert_incoming=_clean_bool( data.get(_invert_in_key(CONF_POSITION_ADDRESS)) ), position_invert_outgoing=_clean_bool( data.get(_invert_out_key(CONF_POSITION_ADDRESS)) ), position_state_address=_clean_address( data.get(CONF_POSITION_STATE_ADDRESS) ), position_state_invert_incoming=_clean_bool( data.get(_invert_in_key(CONF_POSITION_STATE_ADDRESS)) ), position_state_invert_outgoing=_clean_bool( data.get(_invert_out_key(CONF_POSITION_STATE_ADDRESS)) ), angle_address=_clean_address(data.get(CONF_ANGLE_ADDRESS)), angle_invert_incoming=_clean_bool( data.get(_invert_in_key(CONF_ANGLE_ADDRESS)) ), angle_invert_outgoing=_clean_bool( data.get(_invert_out_key(CONF_ANGLE_ADDRESS)) ), angle_state_address=_clean_address( data.get(CONF_ANGLE_STATE_ADDRESS) ), angle_state_invert_incoming=_clean_bool( data.get(_invert_in_key(CONF_ANGLE_STATE_ADDRESS)) ), angle_state_invert_outgoing=_clean_bool( data.get(_invert_out_key(CONF_ANGLE_STATE_ADDRESS)) ), ) ) return binary_ports, switch_ports, cover_ports def _register_outgoing( self, binary_ports: list[BinarySensorPort], switch_ports: list[SwitchPort], cover_ports: list[CoverPort], ) -> None: for port in binary_ports: if not port.state_address: continue self._remember_address_options( port.state_address, _get_value_type(CONF_STATE_ADDRESS), port.state_invert_incoming, port.state_invert_outgoing, ) self._unsub_listeners.append( event_helper.async_track_state_change_event( self.hass, [port.entity_id], self._binary_sensor_changed(port) ) ) for port in switch_ports: if not port.state_address: continue self._remember_address_options( port.state_address, _get_value_type(CONF_STATE_ADDRESS), port.state_invert_incoming, port.state_invert_outgoing, ) self._unsub_listeners.append( event_helper.async_track_state_change_event( self.hass, [port.entity_id], self._switch_changed(port) ) ) for port in cover_ports: if not (port.position_state_address or port.angle_state_address): continue if port.position_state_address: self._remember_address_options( port.position_state_address, _get_value_type(CONF_POSITION_STATE_ADDRESS), port.position_state_invert_incoming, port.position_state_invert_outgoing, ) if port.angle_state_address: self._remember_address_options( port.angle_state_address, _get_value_type(CONF_ANGLE_STATE_ADDRESS), port.angle_state_invert_incoming, port.angle_state_invert_outgoing, ) self._unsub_listeners.append( event_helper.async_track_state_change_event( self.hass, [port.entity_id], self._cover_changed(port) ) ) async def _register_incoming( self, switch_ports: list[SwitchPort], cover_ports: list[CoverPort] ) -> None: for port in switch_ports: self._register_knx_switch_address( port.command_address, port.command_invert_incoming, port.command_invert_outgoing, port, ) for port in cover_ports: self._register_knx_address( port.move_long_address, CONF_MOVE_LONG_ADDRESS, port.move_long_invert_incoming, port.move_long_invert_outgoing, port, "move_long", ) self._register_knx_address( port.move_short_address, CONF_MOVE_SHORT_ADDRESS, port.move_short_invert_incoming, port.move_short_invert_outgoing, port, "move_short", ) self._register_knx_address( port.stop_address, CONF_STOP_ADDRESS, port.stop_invert_incoming, port.stop_invert_outgoing, port, "stop", ) self._register_knx_address( port.position_address, CONF_POSITION_ADDRESS, port.position_invert_incoming, port.position_invert_outgoing, port, "position", ) self._register_knx_address( port.angle_address, CONF_ANGLE_ADDRESS, port.angle_invert_incoming, port.angle_invert_outgoing, port, "angle", ) if self._address_handlers: self._knx_event_unsub = event_helper.async_track_event( self.hass, "knx_event", self._handle_knx_event ) await self._register_knx_events() def _register_knx_address( self, address: str | None, address_key: str, invert_incoming: bool, invert_outgoing: bool, port: CoverPort, action: str, ) -> None: if not address: return self._address_handlers[address] = lambda event: self._handle_cover_action( port, action, event ) value_type = _get_value_type(address_key) self._remember_address_options( address, value_type, invert_incoming, invert_outgoing ) self._registered_addresses.append((address, value_type)) def _register_knx_switch_address( self, address: str | None, invert_incoming: bool, invert_outgoing: bool, port: SwitchPort, ) -> None: if not address: return self._address_handlers[address] = lambda event: self._handle_switch_action( port, event ) self._remember_address_options( address, _get_value_type(CONF_COMMAND_ADDRESS), invert_incoming, invert_outgoing, ) self._registered_addresses.append((address, None)) def _remember_address_options( self, address: str, value_type: str | None, invert_incoming: bool, invert_outgoing: bool, ) -> None: self._address_options[address] = AddressOptions( value_type=value_type, invert_incoming=invert_incoming, invert_outgoing=invert_outgoing, ) async def _register_knx_events(self) -> None: for address, value_type in self._registered_addresses: data: dict[str, Any] = {"address": address} if value_type: data["type"] = value_type await self.hass.services.async_call( KNX_DOMAIN, "event_register", data, blocking=False ) async def _unregister_knx_events(self) -> None: for address, value_type in self._registered_addresses: data: dict[str, Any] = {"address": address, "remove": True} if value_type: data["type"] = value_type await self.hass.services.async_call( KNX_DOMAIN, "event_register", data, blocking=False ) self._registered_addresses.clear() def _binary_sensor_changed(self, port: BinarySensorPort) -> callable[[Event], Any]: async def _handler(event: Event) -> None: new_state: State | None = event.data.get("new_state") if new_state is None: return if new_state.state not in ("on", "off"): return payload = 1 if new_state.state == "on" else 0 payload = _invert_value( payload, port.state_address, self._address_options, "outgoing" ) await self._knx_send_raw(port.state_address, payload) return _handler def _switch_changed(self, port: SwitchPort) -> callable[[Event], Any]: async def _handler(event: Event) -> None: new_state: State | None = event.data.get("new_state") if new_state is None: return if new_state.state not in ("on", "off"): return payload = 1 if new_state.state == "on" else 0 payload = _invert_value( payload, port.state_address, self._address_options, "outgoing" ) await self._knx_send_raw(port.state_address, payload) return _handler def _cover_changed(self, port: CoverPort) -> callable[[Event], Any]: async def _handler(event: Event) -> None: new_state: State | None = event.data.get("new_state") if new_state is None: return if port.position_state_address is not None: position = new_state.attributes.get("current_position") if position is not None: position = _invert_value( position, port.position_state_address, self._address_options, "outgoing", ) await self._knx_send_percent( port.position_state_address, position ) if port.angle_state_address is not None: angle = new_state.attributes.get("current_tilt_position") if angle is not None: angle = _invert_value( angle, port.angle_state_address, self._address_options, "outgoing", ) await self._knx_send_percent(port.angle_state_address, angle) return _handler async def _handle_knx_event(self, event: Event) -> None: if event.data.get("direction") != "Incoming": return destination = event.data.get("destination") if not destination: return handler = self._address_handlers.get(destination) if handler is None: return await handler(event) async def _handle_cover_action( self, port: CoverPort, action: str, event: Event ) -> None: value = _extract_event_value(event) if value is None: return value = _invert_value( value, event.data.get("destination"), self._address_options, "incoming" ) if action == "move_long": if value == 0: await self._call_cover_service(port.entity_id, "open_cover") elif value == 1: await self._call_cover_service(port.entity_id, "close_cover") elif action == "move_short": if value in (0, 1): await self._call_cover_service(port.entity_id, "stop_cover") elif action == "stop": if value in (0, 1): await self._call_cover_service(port.entity_id, "stop_cover") elif action == "position": await self._call_cover_service( port.entity_id, "set_cover_position", {"position": value} ) elif action == "angle": await self._call_cover_service( port.entity_id, "set_cover_tilt_position", {"tilt_position": value} ) async def _handle_switch_action(self, port: SwitchPort, event: Event) -> None: value = _extract_event_value(event) if value is None: return value = _invert_value( value, event.data.get("destination"), self._address_options, "incoming" ) if value == 0: await self._call_switch_service(port.entity_id, "turn_off") elif value == 1: await self._call_switch_service(port.entity_id, "turn_on") async def _call_cover_service( self, entity_id: str, service: str, service_data: dict[str, Any] | None = None ) -> None: data = {"entity_id": entity_id} if service_data: data.update(service_data) await self.hass.services.async_call( "cover", service, data, blocking=False ) async def _call_switch_service( self, entity_id: str, service: str, service_data: dict[str, Any] | None = None ) -> None: data = {"entity_id": entity_id} if service_data: data.update(service_data) await self.hass.services.async_call( "switch", service, data, blocking=False ) async def _knx_send_raw(self, address: str | None, payload: int) -> None: if not address: return await self.hass.services.async_call( KNX_DOMAIN, "send", {"address": address, "payload": payload, "payload_length": 0}, blocking=False, ) async def _knx_send_percent(self, address: str | None, value: int) -> None: if not address: return await self.hass.services.async_call( KNX_DOMAIN, "send", {"address": address, "payload": value, "type": "percent"}, blocking=False, ) def _extract_event_value(event: Event) -> int | None: if "value" in event.data: try: return int(event.data["value"]) except (TypeError, ValueError): return None data = event.data.get("data") if data is None: return None if isinstance(data, list) and data: data = data[0] try: return int(data) & 1 except (TypeError, ValueError): return None def _clean_address(address: Any) -> str | None: if not address: return None if isinstance(address, str): stripped = address.strip() return stripped or None return None def _clean_bool(value: Any) -> bool: return bool(value) def _invert_value( value: int, address: str | None, address_options: dict[str, AddressOptions], direction: str, ) -> int: if address is None: return value options = address_options.get(address) if options is None: return value if direction == "incoming": if not options.invert_incoming: return value else: if not options.invert_outgoing: return value if options.value_type == "percent": return 100 - value if value not in (0, 1): return value return 0 if value == 1 else 1 def _invert_in_key(address_key: str) -> str: return f"{address_key}_{CONF_INVERT_INCOMING}" def _invert_out_key(address_key: str) -> str: return f"{address_key}_{CONF_INVERT_OUTGOING}" def _get_value_type(address_key: str) -> str | None: return ADDRESS_VALUE_TYPE.get(address_key) def _iter_port_configs(entry: ConfigEntry) -> list[tuple[str, dict[str, Any]]]: ports: list[tuple[str, dict[str, Any]]] = [] subentries = getattr(entry, "subentries", []) for subentry in subentries: ports.append((subentry.type, subentry.data)) option_ports = entry.options.get(CONF_PORTS, []) for port in option_ports: port_type = port.get("type") data = port.get("data", {}) if port_type and isinstance(data, dict): ports.append((port_type, data)) return ports