Files
HA-KNX-Bridge/custom_components/ha_knx_bridge/bridge.py
2026-02-15 21:00:24 +01:00

1770 lines
63 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
import logging
from typing import Any
from homeassistant.components.light import (
ATTR_BRIGHTNESS,
ATTR_COLOR_MODE,
ATTR_COLOR_TEMP,
ATTR_HS_COLOR,
ATTR_RGB_COLOR,
ATTR_RGBW_COLOR,
ATTR_XY_COLOR,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import Event, HomeAssistant, State
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import event as event_helper
from homeassistant.util import color as color_util
from .const import (
CONF_ANGLE_ADDRESS,
CONF_ANGLE_STATE_ADDRESS,
ADDRESS_EVENT_TYPE,
ADDRESS_VALUE_TYPE,
CONF_COMMAND_ADDRESS,
CONF_INVERT_OUTGOING,
CONF_LIGHT_ADDRESS,
CONF_LIGHT_STATE_ADDRESS,
CONF_LIGHT_BRIGHTNESS_ADDRESS,
CONF_LIGHT_BRIGHTNESS_STATE_ADDRESS,
CONF_LIGHT_COLOR_ADDRESS,
CONF_LIGHT_COLOR_STATE_ADDRESS,
CONF_LIGHT_RGBW_ADDRESS,
CONF_LIGHT_RGBW_STATE_ADDRESS,
CONF_LIGHT_HUE_ADDRESS,
CONF_LIGHT_HUE_STATE_ADDRESS,
CONF_LIGHT_SATURATION_ADDRESS,
CONF_LIGHT_SATURATION_STATE_ADDRESS,
CONF_LIGHT_XYY_ADDRESS,
CONF_LIGHT_XYY_STATE_ADDRESS,
CONF_LIGHT_COLOR_TEMPERATURE_ADDRESS,
CONF_LIGHT_COLOR_TEMPERATURE_STATE_ADDRESS,
CONF_LIGHT_COLOR_TEMPERATURE_MODE,
CONF_LIGHT_RELATIVE_DIMMING_ADDRESS,
CONF_LIGHT_MIN_KELVIN,
CONF_LIGHT_MAX_KELVIN,
CONF_LIGHT_RED_ADDRESS,
CONF_LIGHT_RED_STATE_ADDRESS,
CONF_LIGHT_RED_BRIGHTNESS_ADDRESS,
CONF_LIGHT_RED_BRIGHTNESS_STATE_ADDRESS,
CONF_LIGHT_GREEN_ADDRESS,
CONF_LIGHT_GREEN_STATE_ADDRESS,
CONF_LIGHT_GREEN_BRIGHTNESS_ADDRESS,
CONF_LIGHT_GREEN_BRIGHTNESS_STATE_ADDRESS,
CONF_LIGHT_BLUE_ADDRESS,
CONF_LIGHT_BLUE_STATE_ADDRESS,
CONF_LIGHT_BLUE_BRIGHTNESS_ADDRESS,
CONF_LIGHT_BLUE_BRIGHTNESS_STATE_ADDRESS,
CONF_LIGHT_WHITE_ADDRESS,
CONF_LIGHT_WHITE_STATE_ADDRESS,
CONF_LIGHT_WHITE_BRIGHTNESS_ADDRESS,
CONF_LIGHT_WHITE_BRIGHTNESS_STATE_ADDRESS,
LIGHT_COLOR_TEMPERATURE_MODES,
CONF_MOVE_LONG_ADDRESS,
CONF_MOVE_SHORT_ADDRESS,
CONF_PORT_ENABLED,
CONF_PORTS,
CONF_POSITION_ADDRESS,
CONF_POSITION_STATE_ADDRESS,
CONF_STATE_ADDRESS,
CONF_STOP_ADDRESS,
)
_LOGGER = logging.getLogger(__name__)
KNX_DOMAIN = "knx"
try:
from homeassistant.components.light import ATTR_COLOR_TEMP_KELVIN
except ImportError: # pragma: no cover - fallback for older HA
ATTR_COLOR_TEMP_KELVIN = "color_temp_kelvin"
@dataclass(frozen=True)
class BinarySensorPort:
entity_id: str
state_address: str | None
state_invert_outgoing: bool
@dataclass(frozen=True)
class CoverPort:
entity_id: str
move_long_address: str | None
move_short_address: str | None
stop_address: str | None
position_address: str | None
position_state_address: str | None
position_state_invert_outgoing: bool
angle_address: str | None
angle_state_address: str | None
angle_state_invert_outgoing: bool
@dataclass(frozen=True)
class SwitchPort:
entity_id: str
command_address: str | None
state_address: str | None
state_invert_outgoing: bool
@dataclass(frozen=True)
class AddressOptions:
value_type: str | None
invert_outgoing: bool
@dataclass(frozen=True)
class LightPort:
entity_id: str
address: str | None
state_address: str | None
state_invert_outgoing: bool
brightness_address: str | None
brightness_state_address: str | None
brightness_state_invert_outgoing: bool
color_address: str | None
color_state_address: str | None
rgbw_address: str | None
rgbw_state_address: str | None
hue_address: str | None
hue_state_address: str | None
saturation_address: str | None
saturation_state_address: str | None
saturation_state_invert_outgoing: bool
xyy_address: str | None
xyy_state_address: str | None
color_temperature_address: str | None
color_temperature_state_address: str | None
color_temperature_state_invert_outgoing: bool
color_temperature_mode: str
min_kelvin: int | None
max_kelvin: int | None
relative_dimming_address: str | None
red_address: str | None
red_state_address: str | None
red_state_invert_outgoing: bool
red_brightness_address: str | None
red_brightness_state_address: str | None
red_brightness_state_invert_outgoing: bool
green_address: str | None
green_state_address: str | None
green_state_invert_outgoing: bool
green_brightness_address: str | None
green_brightness_state_address: str | None
green_brightness_state_invert_outgoing: bool
blue_address: str | None
blue_state_address: str | None
blue_state_invert_outgoing: bool
blue_brightness_address: str | None
blue_brightness_state_address: str | None
blue_brightness_state_invert_outgoing: bool
white_address: str | None
white_state_address: str | None
white_state_invert_outgoing: bool
white_brightness_address: str | None
white_brightness_state_address: str | None
white_brightness_state_invert_outgoing: bool
class BridgeManager:
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
self.hass = hass
self.entry = entry
self._unsub_listeners: list[callable] = []
self._knx_event_unsub: callable | None = None
self._address_handlers: dict[str, callable[[Event], Any]] = {}
self._registered_addresses: list[tuple[str, str | None]] = []
self._address_options: dict[str, AddressOptions] = {}
self._light_components: dict[str, dict[str, int]] = {}
async def async_setup(self) -> None:
if not self.hass.services.has_service(KNX_DOMAIN, "send"):
raise ConfigEntryNotReady("KNX integration services not available")
binary_ports, switch_ports, cover_ports, light_ports = self._load_ports()
self._register_outgoing(binary_ports, switch_ports, cover_ports, light_ports)
await self._register_incoming(switch_ports, cover_ports, light_ports)
async def async_unload(self) -> None:
for unsub in self._unsub_listeners:
unsub()
self._unsub_listeners.clear()
if self._knx_event_unsub is not None:
self._knx_event_unsub()
self._knx_event_unsub = None
await self._unregister_knx_events()
def _load_ports(
self,
) -> tuple[
list[BinarySensorPort],
list[SwitchPort],
list[CoverPort],
list[LightPort],
]:
binary_ports: list[BinarySensorPort] = []
switch_ports: list[SwitchPort] = []
cover_ports: list[CoverPort] = []
light_ports: list[LightPort] = []
enabled_overrides = dict(self.entry.options.get(CONF_PORT_ENABLED, {}))
for port in _iter_port_configs(self.entry):
port_type = port.port_type
data = port.data
if not _is_port_enabled(port, enabled_overrides):
continue
if port_type == "binary_sensor":
binary_ports.append(
BinarySensorPort(
entity_id=data["entity_id"],
state_address=_clean_address(data.get(CONF_STATE_ADDRESS)),
state_invert_outgoing=_clean_bool(
data.get(_invert_out_key(CONF_STATE_ADDRESS))
),
)
)
elif port_type == "switch":
switch_ports.append(
SwitchPort(
entity_id=data["entity_id"],
command_address=_clean_address(
data.get(CONF_COMMAND_ADDRESS)
),
state_address=_clean_address(data.get(CONF_STATE_ADDRESS)),
state_invert_outgoing=_clean_bool(
data.get(_invert_out_key(CONF_STATE_ADDRESS))
),
)
)
elif port_type == "cover":
cover_ports.append(
CoverPort(
entity_id=data["entity_id"],
move_long_address=_clean_address(data.get(CONF_MOVE_LONG_ADDRESS)),
move_short_address=_clean_address(data.get(CONF_MOVE_SHORT_ADDRESS)),
stop_address=_clean_address(data.get(CONF_STOP_ADDRESS)),
position_address=_clean_address(data.get(CONF_POSITION_ADDRESS)),
position_state_address=_clean_address(
data.get(CONF_POSITION_STATE_ADDRESS)
),
position_state_invert_outgoing=_clean_bool(
data.get(_invert_out_key(CONF_POSITION_STATE_ADDRESS))
),
angle_address=_clean_address(data.get(CONF_ANGLE_ADDRESS)),
angle_state_address=_clean_address(
data.get(CONF_ANGLE_STATE_ADDRESS)
),
angle_state_invert_outgoing=_clean_bool(
data.get(_invert_out_key(CONF_ANGLE_STATE_ADDRESS))
),
)
)
elif port_type == "light":
color_temp_mode = str(
data.get(CONF_LIGHT_COLOR_TEMPERATURE_MODE, "absolute")
)
if color_temp_mode not in LIGHT_COLOR_TEMPERATURE_MODES:
color_temp_mode = "absolute"
light_ports.append(
LightPort(
entity_id=data["entity_id"],
address=_clean_address(data.get(CONF_LIGHT_ADDRESS)),
state_address=_clean_address(
data.get(CONF_LIGHT_STATE_ADDRESS)
),
state_invert_outgoing=_clean_bool(
data.get(_invert_out_key(CONF_LIGHT_STATE_ADDRESS))
),
brightness_address=_clean_address(
data.get(CONF_LIGHT_BRIGHTNESS_ADDRESS)
),
brightness_state_address=_clean_address(
data.get(CONF_LIGHT_BRIGHTNESS_STATE_ADDRESS)
),
brightness_state_invert_outgoing=_clean_bool(
data.get(
_invert_out_key(CONF_LIGHT_BRIGHTNESS_STATE_ADDRESS)
)
),
color_address=_clean_address(data.get(CONF_LIGHT_COLOR_ADDRESS)),
color_state_address=_clean_address(
data.get(CONF_LIGHT_COLOR_STATE_ADDRESS)
),
rgbw_address=_clean_address(data.get(CONF_LIGHT_RGBW_ADDRESS)),
rgbw_state_address=_clean_address(
data.get(CONF_LIGHT_RGBW_STATE_ADDRESS)
),
hue_address=_clean_address(data.get(CONF_LIGHT_HUE_ADDRESS)),
hue_state_address=_clean_address(
data.get(CONF_LIGHT_HUE_STATE_ADDRESS)
),
saturation_address=_clean_address(
data.get(CONF_LIGHT_SATURATION_ADDRESS)
),
saturation_state_address=_clean_address(
data.get(CONF_LIGHT_SATURATION_STATE_ADDRESS)
),
saturation_state_invert_outgoing=_clean_bool(
data.get(
_invert_out_key(
CONF_LIGHT_SATURATION_STATE_ADDRESS
)
)
),
xyy_address=_clean_address(data.get(CONF_LIGHT_XYY_ADDRESS)),
xyy_state_address=_clean_address(
data.get(CONF_LIGHT_XYY_STATE_ADDRESS)
),
color_temperature_address=_clean_address(
data.get(CONF_LIGHT_COLOR_TEMPERATURE_ADDRESS)
),
color_temperature_state_address=_clean_address(
data.get(CONF_LIGHT_COLOR_TEMPERATURE_STATE_ADDRESS)
),
color_temperature_state_invert_outgoing=_clean_bool(
data.get(
_invert_out_key(
CONF_LIGHT_COLOR_TEMPERATURE_STATE_ADDRESS
)
)
),
color_temperature_mode=color_temp_mode,
min_kelvin=_clean_int(data.get(CONF_LIGHT_MIN_KELVIN)),
max_kelvin=_clean_int(data.get(CONF_LIGHT_MAX_KELVIN)),
relative_dimming_address=_clean_address(
data.get(CONF_LIGHT_RELATIVE_DIMMING_ADDRESS)
),
red_address=_clean_address(data.get(CONF_LIGHT_RED_ADDRESS)),
red_state_address=_clean_address(
data.get(CONF_LIGHT_RED_STATE_ADDRESS)
),
red_state_invert_outgoing=_clean_bool(
data.get(_invert_out_key(CONF_LIGHT_RED_STATE_ADDRESS))
),
red_brightness_address=_clean_address(
data.get(CONF_LIGHT_RED_BRIGHTNESS_ADDRESS)
),
red_brightness_state_address=_clean_address(
data.get(CONF_LIGHT_RED_BRIGHTNESS_STATE_ADDRESS)
),
red_brightness_state_invert_outgoing=_clean_bool(
data.get(
_invert_out_key(
CONF_LIGHT_RED_BRIGHTNESS_STATE_ADDRESS
)
)
),
green_address=_clean_address(
data.get(CONF_LIGHT_GREEN_ADDRESS)
),
green_state_address=_clean_address(
data.get(CONF_LIGHT_GREEN_STATE_ADDRESS)
),
green_state_invert_outgoing=_clean_bool(
data.get(_invert_out_key(CONF_LIGHT_GREEN_STATE_ADDRESS))
),
green_brightness_address=_clean_address(
data.get(CONF_LIGHT_GREEN_BRIGHTNESS_ADDRESS)
),
green_brightness_state_address=_clean_address(
data.get(CONF_LIGHT_GREEN_BRIGHTNESS_STATE_ADDRESS)
),
green_brightness_state_invert_outgoing=_clean_bool(
data.get(
_invert_out_key(
CONF_LIGHT_GREEN_BRIGHTNESS_STATE_ADDRESS
)
)
),
blue_address=_clean_address(data.get(CONF_LIGHT_BLUE_ADDRESS)),
blue_state_address=_clean_address(
data.get(CONF_LIGHT_BLUE_STATE_ADDRESS)
),
blue_state_invert_outgoing=_clean_bool(
data.get(_invert_out_key(CONF_LIGHT_BLUE_STATE_ADDRESS))
),
blue_brightness_address=_clean_address(
data.get(CONF_LIGHT_BLUE_BRIGHTNESS_ADDRESS)
),
blue_brightness_state_address=_clean_address(
data.get(CONF_LIGHT_BLUE_BRIGHTNESS_STATE_ADDRESS)
),
blue_brightness_state_invert_outgoing=_clean_bool(
data.get(
_invert_out_key(
CONF_LIGHT_BLUE_BRIGHTNESS_STATE_ADDRESS
)
)
),
white_address=_clean_address(
data.get(CONF_LIGHT_WHITE_ADDRESS)
),
white_state_address=_clean_address(
data.get(CONF_LIGHT_WHITE_STATE_ADDRESS)
),
white_state_invert_outgoing=_clean_bool(
data.get(_invert_out_key(CONF_LIGHT_WHITE_STATE_ADDRESS))
),
white_brightness_address=_clean_address(
data.get(CONF_LIGHT_WHITE_BRIGHTNESS_ADDRESS)
),
white_brightness_state_address=_clean_address(
data.get(CONF_LIGHT_WHITE_BRIGHTNESS_STATE_ADDRESS)
),
white_brightness_state_invert_outgoing=_clean_bool(
data.get(
_invert_out_key(
CONF_LIGHT_WHITE_BRIGHTNESS_STATE_ADDRESS
)
)
),
)
)
return binary_ports, switch_ports, cover_ports, light_ports
def _register_outgoing(
self,
binary_ports: list[BinarySensorPort],
switch_ports: list[SwitchPort],
cover_ports: list[CoverPort],
light_ports: list[LightPort],
) -> None:
for port in binary_ports:
if not port.state_address:
continue
self._remember_address_options(
port.state_address,
_get_value_type(CONF_STATE_ADDRESS),
port.state_invert_outgoing,
)
self._unsub_listeners.append(
event_helper.async_track_state_change_event(
self.hass, [port.entity_id], self._binary_sensor_changed(port)
)
)
for port in switch_ports:
if not port.state_address:
continue
self._remember_address_options(
port.state_address,
_get_value_type(CONF_STATE_ADDRESS),
port.state_invert_outgoing,
)
self._unsub_listeners.append(
event_helper.async_track_state_change_event(
self.hass, [port.entity_id], self._switch_changed(port)
)
)
for port in cover_ports:
if not (port.position_state_address or port.angle_state_address):
continue
if port.position_state_address:
self._remember_address_options(
port.position_state_address,
_get_value_type(CONF_POSITION_STATE_ADDRESS),
port.position_state_invert_outgoing,
)
if port.angle_state_address:
self._remember_address_options(
port.angle_state_address,
_get_value_type(CONF_ANGLE_STATE_ADDRESS),
port.angle_state_invert_outgoing,
)
self._unsub_listeners.append(
event_helper.async_track_state_change_event(
self.hass, [port.entity_id], self._cover_changed(port)
)
)
for port in light_ports:
if port.state_address:
self._remember_address_options(
port.state_address,
_get_value_type(CONF_LIGHT_STATE_ADDRESS),
port.state_invert_outgoing,
)
if port.brightness_state_address:
self._remember_address_options(
port.brightness_state_address,
_get_value_type(CONF_LIGHT_BRIGHTNESS_STATE_ADDRESS),
port.brightness_state_invert_outgoing,
)
if port.saturation_state_address:
self._remember_address_options(
port.saturation_state_address,
_get_value_type(CONF_LIGHT_SATURATION_STATE_ADDRESS),
port.saturation_state_invert_outgoing,
)
if (
port.color_temperature_state_address
and port.color_temperature_mode == "relative"
):
self._remember_address_options(
port.color_temperature_state_address,
"percent",
port.color_temperature_state_invert_outgoing,
)
if port.red_state_address:
self._remember_address_options(
port.red_state_address,
_get_value_type(CONF_LIGHT_RED_STATE_ADDRESS),
port.red_state_invert_outgoing,
)
if port.red_brightness_state_address:
self._remember_address_options(
port.red_brightness_state_address,
_get_value_type(CONF_LIGHT_RED_BRIGHTNESS_STATE_ADDRESS),
port.red_brightness_state_invert_outgoing,
)
if port.green_state_address:
self._remember_address_options(
port.green_state_address,
_get_value_type(CONF_LIGHT_GREEN_STATE_ADDRESS),
port.green_state_invert_outgoing,
)
if port.green_brightness_state_address:
self._remember_address_options(
port.green_brightness_state_address,
_get_value_type(CONF_LIGHT_GREEN_BRIGHTNESS_STATE_ADDRESS),
port.green_brightness_state_invert_outgoing,
)
if port.blue_state_address:
self._remember_address_options(
port.blue_state_address,
_get_value_type(CONF_LIGHT_BLUE_STATE_ADDRESS),
port.blue_state_invert_outgoing,
)
if port.blue_brightness_state_address:
self._remember_address_options(
port.blue_brightness_state_address,
_get_value_type(CONF_LIGHT_BLUE_BRIGHTNESS_STATE_ADDRESS),
port.blue_brightness_state_invert_outgoing,
)
if port.white_state_address:
self._remember_address_options(
port.white_state_address,
_get_value_type(CONF_LIGHT_WHITE_STATE_ADDRESS),
port.white_state_invert_outgoing,
)
if port.white_brightness_state_address:
self._remember_address_options(
port.white_brightness_state_address,
_get_value_type(CONF_LIGHT_WHITE_BRIGHTNESS_STATE_ADDRESS),
port.white_brightness_state_invert_outgoing,
)
self._unsub_listeners.append(
event_helper.async_track_state_change_event(
self.hass, [port.entity_id], self._light_changed(port)
)
)
async def _register_incoming(
self,
switch_ports: list[SwitchPort],
cover_ports: list[CoverPort],
light_ports: list[LightPort],
) -> None:
for port in switch_ports:
self._register_knx_switch_address(
port.command_address,
port,
)
for port in cover_ports:
self._register_knx_address(
port.move_long_address,
CONF_MOVE_LONG_ADDRESS,
port,
"move_long",
)
self._register_knx_address(
port.move_short_address,
CONF_MOVE_SHORT_ADDRESS,
port,
"move_short",
)
self._register_knx_address(
port.stop_address,
CONF_STOP_ADDRESS,
port,
"stop",
)
self._register_knx_address(
port.position_address,
CONF_POSITION_ADDRESS,
port,
"position",
)
self._register_knx_address(
port.angle_address,
CONF_ANGLE_ADDRESS,
port,
"angle",
)
for port in light_ports:
self._register_knx_light_command(
port.address,
_get_event_type(CONF_LIGHT_ADDRESS),
port,
"switch",
)
self._register_knx_light_command(
port.brightness_address,
_get_event_type(CONF_LIGHT_BRIGHTNESS_ADDRESS),
port,
"brightness",
)
self._register_knx_light_command(
port.color_address,
_get_event_type(CONF_LIGHT_COLOR_ADDRESS),
port,
"color",
)
self._register_knx_light_command(
port.rgbw_address,
_get_event_type(CONF_LIGHT_RGBW_ADDRESS),
port,
"rgbw",
)
self._register_knx_light_command(
port.hue_address,
_get_event_type(CONF_LIGHT_HUE_ADDRESS),
port,
"hue",
)
self._register_knx_light_command(
port.saturation_address,
_get_event_type(CONF_LIGHT_SATURATION_ADDRESS),
port,
"saturation",
)
self._register_knx_light_command(
port.xyy_address,
_get_event_type(CONF_LIGHT_XYY_ADDRESS),
port,
"xyy",
)
self._register_knx_light_command(
port.color_temperature_address,
_light_color_temperature_event_type(port.color_temperature_mode),
port,
"color_temperature",
)
self._register_knx_light_command(
port.relative_dimming_address,
None,
port,
"relative_dimming",
)
self._register_knx_light_command(
port.red_address,
_get_event_type(CONF_LIGHT_RED_ADDRESS),
port,
"red_switch",
)
self._register_knx_light_command(
port.red_brightness_address,
_get_event_type(CONF_LIGHT_RED_BRIGHTNESS_ADDRESS),
port,
"red_brightness",
)
self._register_knx_light_command(
port.green_address,
_get_event_type(CONF_LIGHT_GREEN_ADDRESS),
port,
"green_switch",
)
self._register_knx_light_command(
port.green_brightness_address,
_get_event_type(CONF_LIGHT_GREEN_BRIGHTNESS_ADDRESS),
port,
"green_brightness",
)
self._register_knx_light_command(
port.blue_address,
_get_event_type(CONF_LIGHT_BLUE_ADDRESS),
port,
"blue_switch",
)
self._register_knx_light_command(
port.blue_brightness_address,
_get_event_type(CONF_LIGHT_BLUE_BRIGHTNESS_ADDRESS),
port,
"blue_brightness",
)
self._register_knx_light_command(
port.white_address,
_get_event_type(CONF_LIGHT_WHITE_ADDRESS),
port,
"white_switch",
)
self._register_knx_light_command(
port.white_brightness_address,
_get_event_type(CONF_LIGHT_WHITE_BRIGHTNESS_ADDRESS),
port,
"white_brightness",
)
if self._address_handlers:
if hasattr(event_helper, "async_track_event"):
self._knx_event_unsub = event_helper.async_track_event(
self.hass, "knx_event", self._handle_knx_event
)
else:
self._knx_event_unsub = self.hass.bus.async_listen(
"knx_event", self._handle_knx_event
)
await self._register_knx_events()
def _register_knx_address(
self,
address: str | None,
address_key: str,
port: CoverPort,
action: str,
) -> None:
if not address:
return
self._address_handlers[address] = lambda event: self._handle_cover_action(
port, action, event
)
value_type = _get_value_type(address_key)
event_type = _get_event_type(address_key)
self._remember_address_options(
address, value_type, False
)
self._registered_addresses.append((address, event_type))
def _register_knx_switch_address(
self,
address: str | None,
port: SwitchPort,
) -> None:
if not address:
return
self._address_handlers[address] = lambda event: self._handle_switch_action(
port, event
)
self._remember_address_options(
address,
_get_value_type(CONF_COMMAND_ADDRESS),
False,
)
self._registered_addresses.append((address, _get_event_type(CONF_COMMAND_ADDRESS)))
def _register_knx_light_command(
self,
address: str | None,
event_type: str | None,
port: LightPort,
action: str,
) -> None:
if not address:
return
self._address_handlers[address] = lambda event: self._handle_light_action(
port, action, event
)
self._registered_addresses.append((address, event_type))
def _remember_address_options(
self,
address: str,
value_type: str | None,
invert_outgoing: bool,
) -> None:
self._address_options[address] = AddressOptions(
value_type=value_type,
invert_outgoing=invert_outgoing,
)
async def _register_knx_events(self) -> None:
for address, event_type in self._registered_addresses:
data: dict[str, Any] = {"address": address}
if event_type:
data["type"] = event_type
await self.hass.services.async_call(
KNX_DOMAIN, "event_register", data, blocking=False
)
async def _unregister_knx_events(self) -> None:
for address, event_type in self._registered_addresses:
data: dict[str, Any] = {"address": address, "remove": True}
if event_type:
data["type"] = event_type
await self.hass.services.async_call(
KNX_DOMAIN, "event_register", data, blocking=False
)
self._registered_addresses.clear()
def _binary_sensor_changed(self, port: BinarySensorPort) -> callable[[Event], Any]:
async def _handler(event: Event) -> None:
new_state: State | None = event.data.get("new_state")
if new_state is None:
return
if new_state.state not in ("on", "off"):
return
payload = 1 if new_state.state == "on" else 0
payload = _invert_value(
payload, port.state_address, self._address_options
)
await self._knx_send_raw(port.state_address, payload)
return _handler
def _switch_changed(self, port: SwitchPort) -> callable[[Event], Any]:
async def _handler(event: Event) -> None:
new_state: State | None = event.data.get("new_state")
if new_state is None:
return
if new_state.state not in ("on", "off"):
return
payload = 1 if new_state.state == "on" else 0
payload = _invert_value(
payload, port.state_address, self._address_options
)
await self._knx_send_raw(port.state_address, payload)
return _handler
def _cover_changed(self, port: CoverPort) -> callable[[Event], Any]:
async def _handler(event: Event) -> None:
new_state: State | None = event.data.get("new_state")
if new_state is None:
return
if port.position_state_address is not None:
position = new_state.attributes.get("current_position")
if position is not None:
position = _invert_value(
position,
port.position_state_address,
self._address_options,
)
await self._knx_send_percent(
port.position_state_address, position
)
if port.angle_state_address is not None:
angle = new_state.attributes.get("current_tilt_position")
if angle is not None:
angle = _invert_value(
angle,
port.angle_state_address,
self._address_options,
)
await self._knx_send_percent(port.angle_state_address, angle)
return _handler
def _light_changed(self, port: LightPort) -> callable[[Event], Any]:
async def _handler(event: Event) -> None:
new_state: State | None = event.data.get("new_state")
if new_state is None:
return
if new_state.state not in ("on", "off"):
return
is_on = new_state.state == "on"
attrs = new_state.attributes
brightness = attrs.get(ATTR_BRIGHTNESS)
brightness_percent = (
_brightness_to_percent(brightness) if brightness is not None else None
)
hs_color = attrs.get(ATTR_HS_COLOR)
rgb_color = attrs.get(ATTR_RGB_COLOR)
rgbw_color = attrs.get(ATTR_RGBW_COLOR)
xy_color = attrs.get(ATTR_XY_COLOR)
color_temp_kelvin = attrs.get(ATTR_COLOR_TEMP_KELVIN)
color_temp_mired = attrs.get(ATTR_COLOR_TEMP)
if color_temp_kelvin is None and color_temp_mired is not None:
color_temp_kelvin = _mireds_to_kelvin(color_temp_mired)
if port.state_address:
payload = 1 if is_on else 0
payload = _invert_value(
payload, port.state_address, self._address_options
)
await self._knx_send_raw(port.state_address, payload)
if port.brightness_state_address:
if is_on:
if brightness_percent is not None:
payload = brightness_percent
payload = _invert_value(
payload,
port.brightness_state_address,
self._address_options,
)
await self._knx_send_percent(
port.brightness_state_address, payload
)
else:
payload = 0
payload = _invert_value(
payload,
port.brightness_state_address,
self._address_options,
)
await self._knx_send_percent(
port.brightness_state_address, payload
)
if rgb_color is None and hs_color is not None:
rgb_color = color_util.color_hs_to_RGB(*hs_color)
if rgb_color is None and xy_color is not None:
rgb_color = color_util.color_xy_to_RGB(
xy_color[0], xy_color[1], brightness or 255
)
if port.color_state_address and rgb_color is not None:
await self._knx_send_typed(
port.color_state_address, list(rgb_color), "232.600"
)
if port.rgbw_state_address and rgbw_color is not None:
await self._knx_send_typed(
port.rgbw_state_address, list(rgbw_color), "251.600"
)
if port.hue_state_address and hs_color is not None:
await self._knx_send_typed(
port.hue_state_address, int(round(hs_color[0])), "5.003"
)
if port.saturation_state_address and hs_color is not None:
payload = int(round(hs_color[1]))
payload = _invert_value(
payload, port.saturation_state_address, self._address_options
)
await self._knx_send_percent(
port.saturation_state_address, payload
)
if port.xyy_state_address and xy_color is not None:
if brightness_percent is None:
brightness_value = 100 if is_on else 0
else:
brightness_value = brightness_percent
await self._knx_send_typed(
port.xyy_state_address,
[float(xy_color[0]), float(xy_color[1]), int(brightness_value)],
"242.600",
)
if port.color_temperature_state_address and color_temp_kelvin is not None:
payload, dpt_type = _color_temperature_payload(
color_temp_kelvin,
port,
clamp=True,
)
if port.color_temperature_mode == "relative":
payload = _invert_value(
int(round(payload)),
port.color_temperature_state_address,
self._address_options,
)
await self._knx_send_typed(
port.color_temperature_state_address, payload, dpt_type
)
self._update_light_components_from_state(
port,
is_on,
brightness,
rgb_color,
rgbw_color,
)
await self._send_light_component_states(port)
return _handler
async def _handle_knx_event(self, event: Event) -> None:
direction = event.data.get("direction")
if direction is not None and not str(direction).lower().startswith("incoming"):
return
destination = _event_destination(event)
if not destination:
return
handler = self._address_handlers.get(destination)
if handler is None:
return
await handler(event)
async def _handle_cover_action(
self, port: CoverPort, action: str, event: Event
) -> None:
value = _extract_event_value(event)
if value is None:
return
destination = _event_destination(event)
value = _invert_value(value, destination, self._address_options)
if action == "move_long":
if value == 0:
await self._call_cover_service(port.entity_id, "open_cover")
elif value == 1:
await self._call_cover_service(port.entity_id, "close_cover")
elif action == "move_short":
if value in (0, 1):
await self._call_cover_service(port.entity_id, "stop_cover")
elif action == "stop":
if value in (0, 1):
await self._call_cover_service(port.entity_id, "stop_cover")
elif action == "position":
await self._call_cover_service(
port.entity_id, "set_cover_position", {"position": value}
)
elif action == "angle":
await self._call_cover_service(
port.entity_id, "set_cover_tilt_position", {"tilt_position": value}
)
async def _handle_switch_action(self, port: SwitchPort, event: Event) -> None:
value = _extract_event_value(event)
if value is None:
return
destination = _event_destination(event)
value = _invert_value(value, destination, self._address_options)
if value == 0:
await self._call_switch_service(port.entity_id, "turn_off")
elif value == 1:
await self._call_switch_service(port.entity_id, "turn_on")
async def _handle_light_action(
self, port: LightPort, action: str, event: Event
) -> None:
if action == "switch":
value = _extract_event_value(event)
if value is None:
return
if value == 0:
await self._call_light_service(port.entity_id, "turn_off")
elif value == 1:
await self._call_light_service(port.entity_id, "turn_on")
return
if action == "brightness":
value = _extract_event_value(event)
if value is None:
return
value = _clamp_percent(value)
if value == 0:
await self._call_light_service(port.entity_id, "turn_off")
else:
await self._call_light_service(
port.entity_id,
"turn_on",
{"brightness": _percent_to_brightness(value)},
)
return
if action == "color":
rgb = _extract_rgb_value(event)
if rgb is None:
return
await self._call_light_service(
port.entity_id, "turn_on", {"rgb_color": rgb}
)
return
if action == "rgbw":
rgbw = _extract_rgbw_value(event)
if rgbw is None:
return
await self._call_light_service(
port.entity_id, "turn_on", {"rgbw_color": rgbw}
)
return
if action == "hue":
value = _extract_event_value(event)
if value is None:
return
hs_color = _current_hs_color(self.hass, port.entity_id)
saturation = 0 if hs_color is None else hs_color[1]
await self._call_light_service(
port.entity_id,
"turn_on",
{"hs_color": (float(value), float(saturation))},
)
return
if action == "saturation":
value = _extract_event_value(event)
if value is None:
return
value = _clamp_percent(value)
hs_color = _current_hs_color(self.hass, port.entity_id)
hue = 0 if hs_color is None else hs_color[0]
await self._call_light_service(
port.entity_id,
"turn_on",
{"hs_color": (float(hue), float(value))},
)
return
if action == "xyy":
xyy = _extract_xyy_value(event)
if xyy is None:
return
x, y, luminance = xyy
data: dict[str, Any] = {"xy_color": (float(x), float(y))}
if luminance is not None:
lum = _clamp_percent(int(round(luminance)))
if lum == 0:
await self._call_light_service(port.entity_id, "turn_off")
return
data["brightness"] = _percent_to_brightness(lum)
await self._call_light_service(port.entity_id, "turn_on", data)
return
if action == "color_temperature":
value = _extract_float_value(event)
if value is None:
return
kelvin = _color_temperature_from_value(value, port)
if kelvin is None:
return
await self._call_light_service(
port.entity_id,
"turn_on",
{"color_temp": _kelvin_to_mireds(kelvin)},
)
return
if action == "relative_dimming":
value = _extract_event_value(event)
if value is None:
return
if value in (0, 8):
return
step = _relative_dimming_step(value)
if step is None:
return
direction, percent = step
if percent <= 0:
return
if direction == "down":
percent = -percent
await self._call_light_service(
port.entity_id,
"turn_on",
{"brightness_step_pct": percent},
)
return
if action.startswith("red_"):
await self._handle_light_component_action(port, "red", action, event)
elif action.startswith("green_"):
await self._handle_light_component_action(port, "green", action, event)
elif action.startswith("blue_"):
await self._handle_light_component_action(port, "blue", action, event)
elif action.startswith("white_"):
await self._handle_light_component_action(port, "white", action, event)
async def _call_cover_service(
self, entity_id: str, service: str, service_data: dict[str, Any] | None = None
) -> None:
data = {"entity_id": entity_id}
if service_data:
data.update(service_data)
await self.hass.services.async_call(
"cover", service, data, blocking=False
)
async def _call_switch_service(
self, entity_id: str, service: str, service_data: dict[str, Any] | None = None
) -> None:
data = {"entity_id": entity_id}
if service_data:
data.update(service_data)
await self.hass.services.async_call(
"switch", service, data, blocking=False
)
async def _call_light_service(
self, entity_id: str, service: str, service_data: dict[str, Any] | None = None
) -> None:
data = {"entity_id": entity_id}
if service_data:
data.update(service_data)
await self.hass.services.async_call("light", service, data, blocking=False)
async def _knx_send_raw(self, address: str | None, payload: int) -> None:
if not address:
return
await self.hass.services.async_call(
KNX_DOMAIN,
"send",
{"address": address, "payload": payload},
blocking=False,
)
async def _knx_send_typed(
self, address: str | None, payload: Any, dpt_type: str
) -> None:
if not address:
return
await self.hass.services.async_call(
KNX_DOMAIN,
"send",
{"address": address, "payload": payload, "type": dpt_type},
blocking=False,
)
async def _knx_send_percent(self, address: str | None, value: int) -> None:
if not address:
return
value = _clamp_percent(value)
await self.hass.services.async_call(
KNX_DOMAIN,
"send",
{"address": address, "payload": value, "type": "percent"},
blocking=False,
)
def _update_light_components_from_state(
self,
port: LightPort,
is_on: bool,
brightness: int | None,
rgb_color: tuple[int, int, int] | None,
rgbw_color: tuple[int, int, int, int] | None,
) -> None:
if not is_on:
self._light_components[port.entity_id] = {"r": 0, "g": 0, "b": 0, "w": 0}
return
scale = 1.0
if brightness is not None:
scale = max(0.0, min(1.0, brightness / 255))
components = self._light_components.get(port.entity_id, {})
if rgbw_color is not None:
components = {
"r": int(round(rgbw_color[0] * scale)),
"g": int(round(rgbw_color[1] * scale)),
"b": int(round(rgbw_color[2] * scale)),
"w": int(round(rgbw_color[3] * scale)),
}
elif rgb_color is not None:
components = {
"r": int(round(rgb_color[0] * scale)),
"g": int(round(rgb_color[1] * scale)),
"b": int(round(rgb_color[2] * scale)),
"w": components.get("w", 0),
}
self._light_components[port.entity_id] = components
async def _send_light_component_states(self, port: LightPort) -> None:
components = self._light_components.get(port.entity_id)
if not components:
return
await self._send_light_component_state(
port.red_state_address,
port.red_state_invert_outgoing,
components.get("r", 0),
)
await self._send_light_component_brightness_state(
port.red_brightness_state_address,
port.red_brightness_state_invert_outgoing,
components.get("r", 0),
)
await self._send_light_component_state(
port.green_state_address,
port.green_state_invert_outgoing,
components.get("g", 0),
)
await self._send_light_component_brightness_state(
port.green_brightness_state_address,
port.green_brightness_state_invert_outgoing,
components.get("g", 0),
)
await self._send_light_component_state(
port.blue_state_address,
port.blue_state_invert_outgoing,
components.get("b", 0),
)
await self._send_light_component_brightness_state(
port.blue_brightness_state_address,
port.blue_brightness_state_invert_outgoing,
components.get("b", 0),
)
await self._send_light_component_state(
port.white_state_address,
port.white_state_invert_outgoing,
components.get("w", 0),
)
await self._send_light_component_brightness_state(
port.white_brightness_state_address,
port.white_brightness_state_invert_outgoing,
components.get("w", 0),
)
async def _send_light_component_state(
self,
address: str | None,
invert_outgoing: bool,
value: int,
) -> None:
if not address:
return
payload = 1 if value > 0 else 0
if invert_outgoing:
payload = _invert_value(payload, address, self._address_options)
await self._knx_send_raw(address, payload)
async def _send_light_component_brightness_state(
self,
address: str | None,
invert_outgoing: bool,
value: int,
) -> None:
if not address:
return
payload = _brightness_to_percent(value)
if invert_outgoing:
payload = _invert_value(payload, address, self._address_options)
await self._knx_send_percent(address, payload)
async def _handle_light_component_action(
self, port: LightPort, color: str, action: str, event: Event
) -> None:
components = self._light_components.setdefault(
port.entity_id, {"r": 0, "g": 0, "b": 0, "w": 0}
)
if action.endswith("_switch"):
value = _extract_event_value(event)
if value is None:
return
if value == 0:
components[color[0]] = 0
elif value == 1 and components[color[0]] == 0:
components[color[0]] = 255
elif action.endswith("_brightness"):
value = _extract_event_value(event)
if value is None:
return
value = _clamp_percent(value)
components[color[0]] = _percent_to_brightness(value)
else:
return
if all(value == 0 for value in components.values()):
await self._call_light_service(port.entity_id, "turn_off")
return
if _light_uses_white_channel(port):
await self._call_light_service(
port.entity_id,
"turn_on",
{
"rgbw_color": (
components["r"],
components["g"],
components["b"],
components["w"],
)
},
)
else:
await self._call_light_service(
port.entity_id,
"turn_on",
{
"rgb_color": (
components["r"],
components["g"],
components["b"],
)
},
)
def _extract_event_value(event: Event) -> int | None:
if "value" in event.data:
value = event.data["value"]
mapped = _map_scalar_value(value)
if mapped is not None:
return mapped
data = event.data.get("data")
if data is None:
return None
if isinstance(data, list) and data:
data = data[0]
mapped = _map_scalar_value(data)
if mapped is not None:
return mapped
try:
return int(data) & 1
except (TypeError, ValueError):
return None
def _clean_address(address: Any) -> str | None:
if not address:
return None
if isinstance(address, str):
stripped = address.strip()
return stripped or None
return None
def _clean_bool(value: Any) -> bool:
return bool(value)
def _clean_int(value: Any) -> int | None:
if value is None or value == "":
return None
try:
return int(value)
except (TypeError, ValueError):
return None
def _invert_value(
value: int,
address: str | None,
address_options: dict[str, AddressOptions],
) -> int:
if address is None:
return value
options = address_options.get(address)
if options is None:
return value
if options.value_type == "percent":
value = _clamp_percent(value)
if not options.invert_outgoing:
return value
if options.value_type == "percent":
return 100 - value
if value not in (0, 1):
return value
return 0 if value == 1 else 1
def _invert_out_key(address_key: str) -> str:
return f"{address_key}_{CONF_INVERT_OUTGOING}"
def _get_value_type(address_key: str) -> str | None:
return ADDRESS_VALUE_TYPE.get(address_key)
def _get_event_type(address_key: str) -> str | None:
return ADDRESS_EVENT_TYPE.get(address_key)
def _clamp_percent(value: int) -> int:
if value < 0:
return 0
if value > 100:
return 100
return value
def _brightness_to_percent(brightness: int) -> int:
if brightness <= 0:
return 0
if brightness >= 255:
return 100
return int(round(brightness / 255 * 100))
def _percent_to_brightness(percent: int) -> int:
percent = _clamp_percent(percent)
return int(round(percent / 100 * 255))
def _kelvin_to_mireds(kelvin: float) -> int:
if kelvin <= 0:
return 0
return int(round(1000000 / kelvin))
def _mireds_to_kelvin(mireds: float) -> float:
if mireds <= 0:
return 0
return 1000000 / mireds
def _extract_rgb_value(event: Event) -> tuple[int, int, int] | None:
value = event.data.get("value")
if isinstance(value, dict):
red = value.get("red", value.get("r"))
green = value.get("green", value.get("g"))
blue = value.get("blue", value.get("b"))
if red is not None and green is not None and blue is not None:
return (int(red), int(green), int(blue))
if isinstance(value, (list, tuple)) and len(value) >= 3:
return (int(value[0]), int(value[1]), int(value[2]))
data = event.data.get("data")
if isinstance(data, (list, tuple)) and len(data) >= 3:
return (int(data[0]), int(data[1]), int(data[2]))
return None
def _extract_rgbw_value(event: Event) -> tuple[int, int, int, int] | None:
value = event.data.get("value")
if isinstance(value, dict):
red = value.get("red", value.get("r"))
green = value.get("green", value.get("g"))
blue = value.get("blue", value.get("b"))
white = value.get("white", value.get("w"))
if (
red is not None
and green is not None
and blue is not None
and white is not None
):
return (int(red), int(green), int(blue), int(white))
if isinstance(value, (list, tuple)) and len(value) >= 4:
return (int(value[0]), int(value[1]), int(value[2]), int(value[3]))
data = event.data.get("data")
if isinstance(data, (list, tuple)) and len(data) >= 4:
return (int(data[0]), int(data[1]), int(data[2]), int(data[3]))
return None
def _extract_xyy_value(
event: Event,
) -> tuple[float, float, float | None] | None:
value = event.data.get("value")
if isinstance(value, dict):
x = value.get("x")
y = value.get("y")
luminance = value.get("Y", value.get("luminance", value.get("brightness")))
if x is not None and y is not None:
return (
float(x),
float(y),
float(luminance) if luminance is not None else None,
)
if isinstance(value, (list, tuple)) and len(value) >= 2:
x = value[0]
y = value[1]
luminance = value[2] if len(value) > 2 else None
return (
float(x),
float(y),
float(luminance) if luminance is not None else None,
)
data = event.data.get("data")
if isinstance(data, (list, tuple)) and len(data) >= 2:
x = data[0]
y = data[1]
luminance = data[2] if len(data) > 2 else None
return (
float(x),
float(y),
float(luminance) if luminance is not None else None,
)
return None
def _extract_float_value(event: Event) -> float | None:
if "value" in event.data:
value = event.data["value"]
if isinstance(value, (int, float)):
return float(value)
mapped = _extract_event_value(event)
if mapped is None:
return None
return float(mapped)
def _current_hs_color(
hass: HomeAssistant, entity_id: str
) -> tuple[float, float] | None:
state = hass.states.get(entity_id)
if state is None:
return None
hs_color = state.attributes.get(ATTR_HS_COLOR)
if hs_color is None:
return None
try:
return float(hs_color[0]), float(hs_color[1])
except (TypeError, ValueError, IndexError):
return None
def _light_color_temperature_event_type(mode: str) -> str | None:
if mode == "relative":
return "percent"
if mode == "absolute":
return "2byte_unsigned"
if mode == "absolute_float":
return "2byte_float"
return None
def _color_temperature_payload(
kelvin: float, port: LightPort, clamp: bool = False
) -> tuple[float | int, str]:
min_kelvin = _light_min_kelvin(port)
max_kelvin = _light_max_kelvin(port)
if clamp:
kelvin = min(max(kelvin, min_kelvin), max_kelvin)
if port.color_temperature_mode == "relative":
percent = (kelvin - min_kelvin) / (max_kelvin - min_kelvin) * 100
return int(round(_clamp_percent(int(round(percent))))), "percent"
if port.color_temperature_mode == "absolute_float":
return float(kelvin), "9"
return int(round(kelvin)), "7.600"
def _color_temperature_from_value(
value: float, port: LightPort
) -> float | None:
min_kelvin = _light_min_kelvin(port)
max_kelvin = _light_max_kelvin(port)
if port.color_temperature_mode == "relative":
percent = _clamp_percent(int(round(value)))
return min_kelvin + (max_kelvin - min_kelvin) * (percent / 100)
if port.color_temperature_mode == "absolute":
return float(value)
if port.color_temperature_mode == "absolute_float":
return float(value)
return None
def _light_min_kelvin(port: LightPort) -> int:
return port.min_kelvin or 2000
def _light_max_kelvin(port: LightPort) -> int:
return port.max_kelvin or 6500
def _light_uses_white_channel(port: LightPort) -> bool:
return any(
[
port.white_address,
port.white_state_address,
port.white_brightness_address,
port.white_brightness_state_address,
]
)
def _relative_dimming_step(value: int) -> tuple[str, int] | None:
value = value & 0x0F
if value == 0 or value == 8:
return None
if value <= 7:
direction = "down"
step = value
else:
direction = "up"
step = value - 8
percent_map = {
1: 10,
2: 8,
3: 6,
4: 4,
5: 3,
6: 2,
7: 1,
}
percent = percent_map.get(step)
if percent is None:
return None
return direction, percent
def _map_scalar_value(value: Any) -> int | None:
if isinstance(value, bool):
return 1 if value else 0
if isinstance(value, (int, float)):
return int(value)
if isinstance(value, str):
text = value.strip().lower()
if text in ("on", "true", "yes", "1", "down", "close", "closed"):
return 1
if text in ("off", "false", "no", "0", "up", "open", "opened"):
return 0
try:
return int(text)
except ValueError:
return None
return None
def _event_destination(event: Event) -> str | None:
return (
event.data.get("destination")
or event.data.get("destination_address")
or event.data.get("address")
)
@dataclass(frozen=True)
class PortConfig:
port_id: str
port_type: str
title: str
data: dict[str, Any]
enabled: bool | None
def _iter_port_configs(entry: ConfigEntry) -> list[PortConfig]:
ports: list[PortConfig] = []
subentries = getattr(entry, "subentries", [])
for subentry in subentries:
ports.append(
PortConfig(
port_id=subentry.entry_id,
port_type=subentry.type,
title=subentry.title or subentry.entry_id,
data=subentry.data,
enabled=subentry.data.get(CONF_ENABLED),
)
)
option_ports = entry.options.get(CONF_PORTS, [])
for port in option_ports:
port_type = port.get("type")
data = port.get("data", {})
if port_type and isinstance(data, dict):
ports.append(
PortConfig(
port_id=port.get("id", ""),
port_type=port_type,
title=port.get("title", port.get("id", "Port")),
data=data,
enabled=port.get("enabled"),
)
)
return ports
def _is_port_enabled(port: PortConfig, overrides: dict[str, Any]) -> bool:
if port.port_id and port.port_id in overrides:
return bool(overrides[port.port_id])
if port.enabled is None:
return True
return bool(port.enabled)
def iter_ports(entry: ConfigEntry) -> list[PortConfig]:
return _iter_port_configs(entry)