mirror of
https://github.com/bahmcloud/HA-KNX-Bridge.git
synced 2026-04-06 14:31:13 +00:00
1881 lines
67 KiB
Python
1881 lines
67 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
import asyncio
|
|
import logging
|
|
from typing import Any
|
|
|
|
from homeassistant.components.light import (
|
|
ATTR_BRIGHTNESS,
|
|
ATTR_COLOR_MODE,
|
|
ATTR_COLOR_TEMP,
|
|
ATTR_HS_COLOR,
|
|
ATTR_RGB_COLOR,
|
|
ATTR_RGBW_COLOR,
|
|
ATTR_XY_COLOR,
|
|
)
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.core import Event, HomeAssistant, State
|
|
from homeassistant.exceptions import ConfigEntryNotReady
|
|
from homeassistant.helpers import event as event_helper
|
|
from homeassistant.util import color as color_util
|
|
|
|
from .const import (
|
|
CONF_ANGLE_ADDRESS,
|
|
CONF_ANGLE_STATE_ADDRESS,
|
|
ADDRESS_EVENT_TYPE,
|
|
ADDRESS_VALUE_TYPE,
|
|
CONF_COMMAND_ADDRESS,
|
|
CONF_INVERT_OUTGOING,
|
|
CONF_LIGHT_ADDRESS,
|
|
CONF_LIGHT_STATE_ADDRESS,
|
|
CONF_LIGHT_BRIGHTNESS_ADDRESS,
|
|
CONF_LIGHT_BRIGHTNESS_STATE_ADDRESS,
|
|
CONF_LIGHT_COLOR_ADDRESS,
|
|
CONF_LIGHT_COLOR_STATE_ADDRESS,
|
|
CONF_LIGHT_RGBW_ADDRESS,
|
|
CONF_LIGHT_RGBW_STATE_ADDRESS,
|
|
CONF_LIGHT_HUE_ADDRESS,
|
|
CONF_LIGHT_HUE_STATE_ADDRESS,
|
|
CONF_LIGHT_SATURATION_ADDRESS,
|
|
CONF_LIGHT_SATURATION_STATE_ADDRESS,
|
|
CONF_LIGHT_XYY_ADDRESS,
|
|
CONF_LIGHT_XYY_STATE_ADDRESS,
|
|
CONF_LIGHT_COLOR_TEMPERATURE_ADDRESS,
|
|
CONF_LIGHT_COLOR_TEMPERATURE_STATE_ADDRESS,
|
|
CONF_LIGHT_COLOR_TEMPERATURE_MODE,
|
|
CONF_LIGHT_RELATIVE_COLOR_TEMPERATURE_ADDRESS,
|
|
CONF_LIGHT_RELATIVE_DIMMING_ADDRESS,
|
|
CONF_LIGHT_MIN_KELVIN,
|
|
CONF_LIGHT_MAX_KELVIN,
|
|
CONF_LIGHT_RED_ADDRESS,
|
|
CONF_LIGHT_RED_STATE_ADDRESS,
|
|
CONF_LIGHT_RED_BRIGHTNESS_ADDRESS,
|
|
CONF_LIGHT_RED_BRIGHTNESS_STATE_ADDRESS,
|
|
CONF_LIGHT_GREEN_ADDRESS,
|
|
CONF_LIGHT_GREEN_STATE_ADDRESS,
|
|
CONF_LIGHT_GREEN_BRIGHTNESS_ADDRESS,
|
|
CONF_LIGHT_GREEN_BRIGHTNESS_STATE_ADDRESS,
|
|
CONF_LIGHT_BLUE_ADDRESS,
|
|
CONF_LIGHT_BLUE_STATE_ADDRESS,
|
|
CONF_LIGHT_BLUE_BRIGHTNESS_ADDRESS,
|
|
CONF_LIGHT_BLUE_BRIGHTNESS_STATE_ADDRESS,
|
|
CONF_LIGHT_WHITE_ADDRESS,
|
|
CONF_LIGHT_WHITE_STATE_ADDRESS,
|
|
CONF_LIGHT_WHITE_BRIGHTNESS_ADDRESS,
|
|
CONF_LIGHT_WHITE_BRIGHTNESS_STATE_ADDRESS,
|
|
LIGHT_COLOR_TEMPERATURE_MODES,
|
|
CONF_MOVE_LONG_ADDRESS,
|
|
CONF_MOVE_SHORT_ADDRESS,
|
|
CONF_PORT_ENABLED,
|
|
CONF_PORTS,
|
|
CONF_POSITION_ADDRESS,
|
|
CONF_POSITION_STATE_ADDRESS,
|
|
CONF_STATE_ADDRESS,
|
|
CONF_STOP_ADDRESS,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
KNX_DOMAIN = "knx"
|
|
|
|
try:
|
|
from homeassistant.components.light import ATTR_COLOR_TEMP_KELVIN
|
|
except ImportError: # pragma: no cover - fallback for older HA
|
|
ATTR_COLOR_TEMP_KELVIN = "color_temp_kelvin"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BinarySensorPort:
|
|
entity_id: str
|
|
state_address: str | None
|
|
state_invert_outgoing: bool
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CoverPort:
|
|
entity_id: str
|
|
move_long_address: str | None
|
|
move_short_address: str | None
|
|
stop_address: str | None
|
|
position_address: str | None
|
|
position_state_address: str | None
|
|
position_state_invert_outgoing: bool
|
|
angle_address: str | None
|
|
angle_state_address: str | None
|
|
angle_state_invert_outgoing: bool
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SwitchPort:
|
|
entity_id: str
|
|
command_address: str | None
|
|
state_address: str | None
|
|
state_invert_outgoing: bool
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AddressOptions:
|
|
value_type: str | None
|
|
invert_outgoing: bool
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class LightPort:
|
|
entity_id: str
|
|
address: str | None
|
|
state_address: str | None
|
|
state_invert_outgoing: bool
|
|
brightness_address: str | None
|
|
brightness_state_address: str | None
|
|
brightness_state_invert_outgoing: bool
|
|
color_address: str | None
|
|
color_state_address: str | None
|
|
rgbw_address: str | None
|
|
rgbw_state_address: str | None
|
|
hue_address: str | None
|
|
hue_state_address: str | None
|
|
saturation_address: str | None
|
|
saturation_state_address: str | None
|
|
saturation_state_invert_outgoing: bool
|
|
xyy_address: str | None
|
|
xyy_state_address: str | None
|
|
color_temperature_address: str | None
|
|
color_temperature_state_address: str | None
|
|
color_temperature_state_invert_outgoing: bool
|
|
color_temperature_mode: str
|
|
min_kelvin: int | None
|
|
max_kelvin: int | None
|
|
relative_color_temperature_address: str | None
|
|
relative_dimming_address: str | None
|
|
red_address: str | None
|
|
red_state_address: str | None
|
|
red_state_invert_outgoing: bool
|
|
red_brightness_address: str | None
|
|
red_brightness_state_address: str | None
|
|
red_brightness_state_invert_outgoing: bool
|
|
green_address: str | None
|
|
green_state_address: str | None
|
|
green_state_invert_outgoing: bool
|
|
green_brightness_address: str | None
|
|
green_brightness_state_address: str | None
|
|
green_brightness_state_invert_outgoing: bool
|
|
blue_address: str | None
|
|
blue_state_address: str | None
|
|
blue_state_invert_outgoing: bool
|
|
blue_brightness_address: str | None
|
|
blue_brightness_state_address: str | None
|
|
blue_brightness_state_invert_outgoing: bool
|
|
white_address: str | None
|
|
white_state_address: str | None
|
|
white_state_invert_outgoing: bool
|
|
white_brightness_address: str | None
|
|
white_brightness_state_address: str | None
|
|
white_brightness_state_invert_outgoing: bool
|
|
|
|
|
|
class BridgeManager:
|
|
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
|
|
self.hass = hass
|
|
self.entry = entry
|
|
self._unsub_listeners: list[callable] = []
|
|
self._knx_event_unsub: callable | None = None
|
|
self._address_handlers: dict[str, callable[[Event], Any]] = {}
|
|
self._registered_addresses: list[tuple[str, str | None]] = []
|
|
self._address_options: dict[str, AddressOptions] = {}
|
|
self._light_components: dict[str, dict[str, int]] = {}
|
|
self._light_dimming_tasks: dict[str, asyncio.Task] = {}
|
|
self._light_ct_tasks: dict[str, asyncio.Task] = {}
|
|
|
|
async def async_setup(self) -> None:
|
|
if not self.hass.services.has_service(KNX_DOMAIN, "send"):
|
|
raise ConfigEntryNotReady("KNX integration services not available")
|
|
|
|
binary_ports, switch_ports, cover_ports, light_ports = self._load_ports()
|
|
self._register_outgoing(binary_ports, switch_ports, cover_ports, light_ports)
|
|
await self._register_incoming(switch_ports, cover_ports, light_ports)
|
|
|
|
async def async_unload(self) -> None:
|
|
for unsub in self._unsub_listeners:
|
|
unsub()
|
|
self._unsub_listeners.clear()
|
|
|
|
if self._knx_event_unsub is not None:
|
|
self._knx_event_unsub()
|
|
self._knx_event_unsub = None
|
|
|
|
for task in self._light_dimming_tasks.values():
|
|
task.cancel()
|
|
self._light_dimming_tasks.clear()
|
|
for task in self._light_ct_tasks.values():
|
|
task.cancel()
|
|
self._light_ct_tasks.clear()
|
|
|
|
await self._unregister_knx_events()
|
|
|
|
def _load_ports(
|
|
self,
|
|
) -> tuple[
|
|
list[BinarySensorPort],
|
|
list[SwitchPort],
|
|
list[CoverPort],
|
|
list[LightPort],
|
|
]:
|
|
binary_ports: list[BinarySensorPort] = []
|
|
switch_ports: list[SwitchPort] = []
|
|
cover_ports: list[CoverPort] = []
|
|
light_ports: list[LightPort] = []
|
|
|
|
enabled_overrides = dict(self.entry.options.get(CONF_PORT_ENABLED, {}))
|
|
for port in _iter_port_configs(self.entry):
|
|
port_type = port.port_type
|
|
data = port.data
|
|
if not _is_port_enabled(port, enabled_overrides):
|
|
continue
|
|
if port_type == "binary_sensor":
|
|
binary_ports.append(
|
|
BinarySensorPort(
|
|
entity_id=data["entity_id"],
|
|
state_address=_clean_address(data.get(CONF_STATE_ADDRESS)),
|
|
state_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_STATE_ADDRESS))
|
|
),
|
|
)
|
|
)
|
|
elif port_type == "switch":
|
|
switch_ports.append(
|
|
SwitchPort(
|
|
entity_id=data["entity_id"],
|
|
command_address=_clean_address(
|
|
data.get(CONF_COMMAND_ADDRESS)
|
|
),
|
|
state_address=_clean_address(data.get(CONF_STATE_ADDRESS)),
|
|
state_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_STATE_ADDRESS))
|
|
),
|
|
)
|
|
)
|
|
elif port_type == "cover":
|
|
cover_ports.append(
|
|
CoverPort(
|
|
entity_id=data["entity_id"],
|
|
move_long_address=_clean_address(data.get(CONF_MOVE_LONG_ADDRESS)),
|
|
move_short_address=_clean_address(data.get(CONF_MOVE_SHORT_ADDRESS)),
|
|
stop_address=_clean_address(data.get(CONF_STOP_ADDRESS)),
|
|
position_address=_clean_address(data.get(CONF_POSITION_ADDRESS)),
|
|
position_state_address=_clean_address(
|
|
data.get(CONF_POSITION_STATE_ADDRESS)
|
|
),
|
|
position_state_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_POSITION_STATE_ADDRESS))
|
|
),
|
|
angle_address=_clean_address(data.get(CONF_ANGLE_ADDRESS)),
|
|
angle_state_address=_clean_address(
|
|
data.get(CONF_ANGLE_STATE_ADDRESS)
|
|
),
|
|
angle_state_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_ANGLE_STATE_ADDRESS))
|
|
),
|
|
)
|
|
)
|
|
elif port_type == "light":
|
|
color_temp_mode = str(
|
|
data.get(CONF_LIGHT_COLOR_TEMPERATURE_MODE, "absolute")
|
|
)
|
|
if color_temp_mode not in LIGHT_COLOR_TEMPERATURE_MODES:
|
|
color_temp_mode = "absolute"
|
|
light_ports.append(
|
|
LightPort(
|
|
entity_id=data["entity_id"],
|
|
address=_clean_address(data.get(CONF_LIGHT_ADDRESS)),
|
|
state_address=_clean_address(
|
|
data.get(CONF_LIGHT_STATE_ADDRESS)
|
|
),
|
|
state_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_LIGHT_STATE_ADDRESS))
|
|
),
|
|
brightness_address=_clean_address(
|
|
data.get(CONF_LIGHT_BRIGHTNESS_ADDRESS)
|
|
),
|
|
brightness_state_address=_clean_address(
|
|
data.get(CONF_LIGHT_BRIGHTNESS_STATE_ADDRESS)
|
|
),
|
|
brightness_state_invert_outgoing=_clean_bool(
|
|
data.get(
|
|
_invert_out_key(CONF_LIGHT_BRIGHTNESS_STATE_ADDRESS)
|
|
)
|
|
),
|
|
color_address=_clean_address(data.get(CONF_LIGHT_COLOR_ADDRESS)),
|
|
color_state_address=_clean_address(
|
|
data.get(CONF_LIGHT_COLOR_STATE_ADDRESS)
|
|
),
|
|
rgbw_address=_clean_address(data.get(CONF_LIGHT_RGBW_ADDRESS)),
|
|
rgbw_state_address=_clean_address(
|
|
data.get(CONF_LIGHT_RGBW_STATE_ADDRESS)
|
|
),
|
|
hue_address=_clean_address(data.get(CONF_LIGHT_HUE_ADDRESS)),
|
|
hue_state_address=_clean_address(
|
|
data.get(CONF_LIGHT_HUE_STATE_ADDRESS)
|
|
),
|
|
saturation_address=_clean_address(
|
|
data.get(CONF_LIGHT_SATURATION_ADDRESS)
|
|
),
|
|
saturation_state_address=_clean_address(
|
|
data.get(CONF_LIGHT_SATURATION_STATE_ADDRESS)
|
|
),
|
|
saturation_state_invert_outgoing=_clean_bool(
|
|
data.get(
|
|
_invert_out_key(
|
|
CONF_LIGHT_SATURATION_STATE_ADDRESS
|
|
)
|
|
)
|
|
),
|
|
xyy_address=_clean_address(data.get(CONF_LIGHT_XYY_ADDRESS)),
|
|
xyy_state_address=_clean_address(
|
|
data.get(CONF_LIGHT_XYY_STATE_ADDRESS)
|
|
),
|
|
color_temperature_address=_clean_address(
|
|
data.get(CONF_LIGHT_COLOR_TEMPERATURE_ADDRESS)
|
|
),
|
|
color_temperature_state_address=_clean_address(
|
|
data.get(CONF_LIGHT_COLOR_TEMPERATURE_STATE_ADDRESS)
|
|
),
|
|
color_temperature_state_invert_outgoing=_clean_bool(
|
|
data.get(
|
|
_invert_out_key(
|
|
CONF_LIGHT_COLOR_TEMPERATURE_STATE_ADDRESS
|
|
)
|
|
)
|
|
),
|
|
color_temperature_mode=color_temp_mode,
|
|
min_kelvin=_clean_int(data.get(CONF_LIGHT_MIN_KELVIN)),
|
|
max_kelvin=_clean_int(data.get(CONF_LIGHT_MAX_KELVIN)),
|
|
relative_color_temperature_address=_clean_address(
|
|
data.get(CONF_LIGHT_RELATIVE_COLOR_TEMPERATURE_ADDRESS)
|
|
),
|
|
relative_dimming_address=_clean_address(
|
|
data.get(CONF_LIGHT_RELATIVE_DIMMING_ADDRESS)
|
|
),
|
|
red_address=_clean_address(data.get(CONF_LIGHT_RED_ADDRESS)),
|
|
red_state_address=_clean_address(
|
|
data.get(CONF_LIGHT_RED_STATE_ADDRESS)
|
|
),
|
|
red_state_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_LIGHT_RED_STATE_ADDRESS))
|
|
),
|
|
red_brightness_address=_clean_address(
|
|
data.get(CONF_LIGHT_RED_BRIGHTNESS_ADDRESS)
|
|
),
|
|
red_brightness_state_address=_clean_address(
|
|
data.get(CONF_LIGHT_RED_BRIGHTNESS_STATE_ADDRESS)
|
|
),
|
|
red_brightness_state_invert_outgoing=_clean_bool(
|
|
data.get(
|
|
_invert_out_key(
|
|
CONF_LIGHT_RED_BRIGHTNESS_STATE_ADDRESS
|
|
)
|
|
)
|
|
),
|
|
green_address=_clean_address(
|
|
data.get(CONF_LIGHT_GREEN_ADDRESS)
|
|
),
|
|
green_state_address=_clean_address(
|
|
data.get(CONF_LIGHT_GREEN_STATE_ADDRESS)
|
|
),
|
|
green_state_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_LIGHT_GREEN_STATE_ADDRESS))
|
|
),
|
|
green_brightness_address=_clean_address(
|
|
data.get(CONF_LIGHT_GREEN_BRIGHTNESS_ADDRESS)
|
|
),
|
|
green_brightness_state_address=_clean_address(
|
|
data.get(CONF_LIGHT_GREEN_BRIGHTNESS_STATE_ADDRESS)
|
|
),
|
|
green_brightness_state_invert_outgoing=_clean_bool(
|
|
data.get(
|
|
_invert_out_key(
|
|
CONF_LIGHT_GREEN_BRIGHTNESS_STATE_ADDRESS
|
|
)
|
|
)
|
|
),
|
|
blue_address=_clean_address(data.get(CONF_LIGHT_BLUE_ADDRESS)),
|
|
blue_state_address=_clean_address(
|
|
data.get(CONF_LIGHT_BLUE_STATE_ADDRESS)
|
|
),
|
|
blue_state_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_LIGHT_BLUE_STATE_ADDRESS))
|
|
),
|
|
blue_brightness_address=_clean_address(
|
|
data.get(CONF_LIGHT_BLUE_BRIGHTNESS_ADDRESS)
|
|
),
|
|
blue_brightness_state_address=_clean_address(
|
|
data.get(CONF_LIGHT_BLUE_BRIGHTNESS_STATE_ADDRESS)
|
|
),
|
|
blue_brightness_state_invert_outgoing=_clean_bool(
|
|
data.get(
|
|
_invert_out_key(
|
|
CONF_LIGHT_BLUE_BRIGHTNESS_STATE_ADDRESS
|
|
)
|
|
)
|
|
),
|
|
white_address=_clean_address(
|
|
data.get(CONF_LIGHT_WHITE_ADDRESS)
|
|
),
|
|
white_state_address=_clean_address(
|
|
data.get(CONF_LIGHT_WHITE_STATE_ADDRESS)
|
|
),
|
|
white_state_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_LIGHT_WHITE_STATE_ADDRESS))
|
|
),
|
|
white_brightness_address=_clean_address(
|
|
data.get(CONF_LIGHT_WHITE_BRIGHTNESS_ADDRESS)
|
|
),
|
|
white_brightness_state_address=_clean_address(
|
|
data.get(CONF_LIGHT_WHITE_BRIGHTNESS_STATE_ADDRESS)
|
|
),
|
|
white_brightness_state_invert_outgoing=_clean_bool(
|
|
data.get(
|
|
_invert_out_key(
|
|
CONF_LIGHT_WHITE_BRIGHTNESS_STATE_ADDRESS
|
|
)
|
|
)
|
|
),
|
|
)
|
|
)
|
|
|
|
return binary_ports, switch_ports, cover_ports, light_ports
|
|
|
|
def _register_outgoing(
|
|
self,
|
|
binary_ports: list[BinarySensorPort],
|
|
switch_ports: list[SwitchPort],
|
|
cover_ports: list[CoverPort],
|
|
light_ports: list[LightPort],
|
|
) -> None:
|
|
for port in binary_ports:
|
|
if not port.state_address:
|
|
continue
|
|
self._remember_address_options(
|
|
port.state_address,
|
|
_get_value_type(CONF_STATE_ADDRESS),
|
|
port.state_invert_outgoing,
|
|
)
|
|
self._unsub_listeners.append(
|
|
event_helper.async_track_state_change_event(
|
|
self.hass, [port.entity_id], self._binary_sensor_changed(port)
|
|
)
|
|
)
|
|
|
|
for port in switch_ports:
|
|
if not port.state_address:
|
|
continue
|
|
self._remember_address_options(
|
|
port.state_address,
|
|
_get_value_type(CONF_STATE_ADDRESS),
|
|
port.state_invert_outgoing,
|
|
)
|
|
self._unsub_listeners.append(
|
|
event_helper.async_track_state_change_event(
|
|
self.hass, [port.entity_id], self._switch_changed(port)
|
|
)
|
|
)
|
|
|
|
for port in cover_ports:
|
|
if not (port.position_state_address or port.angle_state_address):
|
|
continue
|
|
if port.position_state_address:
|
|
self._remember_address_options(
|
|
port.position_state_address,
|
|
_get_value_type(CONF_POSITION_STATE_ADDRESS),
|
|
port.position_state_invert_outgoing,
|
|
)
|
|
if port.angle_state_address:
|
|
self._remember_address_options(
|
|
port.angle_state_address,
|
|
_get_value_type(CONF_ANGLE_STATE_ADDRESS),
|
|
port.angle_state_invert_outgoing,
|
|
)
|
|
self._unsub_listeners.append(
|
|
event_helper.async_track_state_change_event(
|
|
self.hass, [port.entity_id], self._cover_changed(port)
|
|
)
|
|
)
|
|
|
|
for port in light_ports:
|
|
if port.state_address:
|
|
self._remember_address_options(
|
|
port.state_address,
|
|
_get_value_type(CONF_LIGHT_STATE_ADDRESS),
|
|
port.state_invert_outgoing,
|
|
)
|
|
if port.brightness_state_address:
|
|
self._remember_address_options(
|
|
port.brightness_state_address,
|
|
_get_value_type(CONF_LIGHT_BRIGHTNESS_STATE_ADDRESS),
|
|
port.brightness_state_invert_outgoing,
|
|
)
|
|
if port.saturation_state_address:
|
|
self._remember_address_options(
|
|
port.saturation_state_address,
|
|
_get_value_type(CONF_LIGHT_SATURATION_STATE_ADDRESS),
|
|
port.saturation_state_invert_outgoing,
|
|
)
|
|
if (
|
|
port.color_temperature_state_address
|
|
and port.color_temperature_mode == "relative"
|
|
):
|
|
self._remember_address_options(
|
|
port.color_temperature_state_address,
|
|
"percent",
|
|
port.color_temperature_state_invert_outgoing,
|
|
)
|
|
if port.red_state_address:
|
|
self._remember_address_options(
|
|
port.red_state_address,
|
|
_get_value_type(CONF_LIGHT_RED_STATE_ADDRESS),
|
|
port.red_state_invert_outgoing,
|
|
)
|
|
if port.red_brightness_state_address:
|
|
self._remember_address_options(
|
|
port.red_brightness_state_address,
|
|
_get_value_type(CONF_LIGHT_RED_BRIGHTNESS_STATE_ADDRESS),
|
|
port.red_brightness_state_invert_outgoing,
|
|
)
|
|
if port.green_state_address:
|
|
self._remember_address_options(
|
|
port.green_state_address,
|
|
_get_value_type(CONF_LIGHT_GREEN_STATE_ADDRESS),
|
|
port.green_state_invert_outgoing,
|
|
)
|
|
if port.green_brightness_state_address:
|
|
self._remember_address_options(
|
|
port.green_brightness_state_address,
|
|
_get_value_type(CONF_LIGHT_GREEN_BRIGHTNESS_STATE_ADDRESS),
|
|
port.green_brightness_state_invert_outgoing,
|
|
)
|
|
if port.blue_state_address:
|
|
self._remember_address_options(
|
|
port.blue_state_address,
|
|
_get_value_type(CONF_LIGHT_BLUE_STATE_ADDRESS),
|
|
port.blue_state_invert_outgoing,
|
|
)
|
|
if port.blue_brightness_state_address:
|
|
self._remember_address_options(
|
|
port.blue_brightness_state_address,
|
|
_get_value_type(CONF_LIGHT_BLUE_BRIGHTNESS_STATE_ADDRESS),
|
|
port.blue_brightness_state_invert_outgoing,
|
|
)
|
|
if port.white_state_address:
|
|
self._remember_address_options(
|
|
port.white_state_address,
|
|
_get_value_type(CONF_LIGHT_WHITE_STATE_ADDRESS),
|
|
port.white_state_invert_outgoing,
|
|
)
|
|
if port.white_brightness_state_address:
|
|
self._remember_address_options(
|
|
port.white_brightness_state_address,
|
|
_get_value_type(CONF_LIGHT_WHITE_BRIGHTNESS_STATE_ADDRESS),
|
|
port.white_brightness_state_invert_outgoing,
|
|
)
|
|
self._unsub_listeners.append(
|
|
event_helper.async_track_state_change_event(
|
|
self.hass, [port.entity_id], self._light_changed(port)
|
|
)
|
|
)
|
|
|
|
async def _register_incoming(
|
|
self,
|
|
switch_ports: list[SwitchPort],
|
|
cover_ports: list[CoverPort],
|
|
light_ports: list[LightPort],
|
|
) -> None:
|
|
for port in switch_ports:
|
|
self._register_knx_switch_address(
|
|
port.command_address,
|
|
port,
|
|
)
|
|
|
|
for port in cover_ports:
|
|
self._register_knx_address(
|
|
port.move_long_address,
|
|
CONF_MOVE_LONG_ADDRESS,
|
|
port,
|
|
"move_long",
|
|
)
|
|
self._register_knx_address(
|
|
port.move_short_address,
|
|
CONF_MOVE_SHORT_ADDRESS,
|
|
port,
|
|
"move_short",
|
|
)
|
|
self._register_knx_address(
|
|
port.stop_address,
|
|
CONF_STOP_ADDRESS,
|
|
port,
|
|
"stop",
|
|
)
|
|
self._register_knx_address(
|
|
port.position_address,
|
|
CONF_POSITION_ADDRESS,
|
|
port,
|
|
"position",
|
|
)
|
|
self._register_knx_address(
|
|
port.angle_address,
|
|
CONF_ANGLE_ADDRESS,
|
|
port,
|
|
"angle",
|
|
)
|
|
|
|
for port in light_ports:
|
|
self._register_knx_light_command(
|
|
port.address,
|
|
_get_event_type(CONF_LIGHT_ADDRESS),
|
|
port,
|
|
"switch",
|
|
)
|
|
self._register_knx_light_command(
|
|
port.brightness_address,
|
|
_get_event_type(CONF_LIGHT_BRIGHTNESS_ADDRESS),
|
|
port,
|
|
"brightness",
|
|
)
|
|
self._register_knx_light_command(
|
|
port.color_address,
|
|
_get_event_type(CONF_LIGHT_COLOR_ADDRESS),
|
|
port,
|
|
"color",
|
|
)
|
|
self._register_knx_light_command(
|
|
port.rgbw_address,
|
|
_get_event_type(CONF_LIGHT_RGBW_ADDRESS),
|
|
port,
|
|
"rgbw",
|
|
)
|
|
self._register_knx_light_command(
|
|
port.hue_address,
|
|
_get_event_type(CONF_LIGHT_HUE_ADDRESS),
|
|
port,
|
|
"hue",
|
|
)
|
|
self._register_knx_light_command(
|
|
port.saturation_address,
|
|
_get_event_type(CONF_LIGHT_SATURATION_ADDRESS),
|
|
port,
|
|
"saturation",
|
|
)
|
|
self._register_knx_light_command(
|
|
port.xyy_address,
|
|
_get_event_type(CONF_LIGHT_XYY_ADDRESS),
|
|
port,
|
|
"xyy",
|
|
)
|
|
self._register_knx_light_command(
|
|
port.color_temperature_address,
|
|
_light_color_temperature_event_type(port.color_temperature_mode),
|
|
port,
|
|
"color_temperature",
|
|
)
|
|
self._register_knx_light_command(
|
|
port.relative_color_temperature_address,
|
|
None,
|
|
port,
|
|
"relative_color_temperature",
|
|
)
|
|
self._register_knx_light_command(
|
|
port.relative_dimming_address,
|
|
None,
|
|
port,
|
|
"relative_dimming",
|
|
)
|
|
self._register_knx_light_command(
|
|
port.red_address,
|
|
_get_event_type(CONF_LIGHT_RED_ADDRESS),
|
|
port,
|
|
"red_switch",
|
|
)
|
|
self._register_knx_light_command(
|
|
port.red_brightness_address,
|
|
_get_event_type(CONF_LIGHT_RED_BRIGHTNESS_ADDRESS),
|
|
port,
|
|
"red_brightness",
|
|
)
|
|
self._register_knx_light_command(
|
|
port.green_address,
|
|
_get_event_type(CONF_LIGHT_GREEN_ADDRESS),
|
|
port,
|
|
"green_switch",
|
|
)
|
|
self._register_knx_light_command(
|
|
port.green_brightness_address,
|
|
_get_event_type(CONF_LIGHT_GREEN_BRIGHTNESS_ADDRESS),
|
|
port,
|
|
"green_brightness",
|
|
)
|
|
self._register_knx_light_command(
|
|
port.blue_address,
|
|
_get_event_type(CONF_LIGHT_BLUE_ADDRESS),
|
|
port,
|
|
"blue_switch",
|
|
)
|
|
self._register_knx_light_command(
|
|
port.blue_brightness_address,
|
|
_get_event_type(CONF_LIGHT_BLUE_BRIGHTNESS_ADDRESS),
|
|
port,
|
|
"blue_brightness",
|
|
)
|
|
self._register_knx_light_command(
|
|
port.white_address,
|
|
_get_event_type(CONF_LIGHT_WHITE_ADDRESS),
|
|
port,
|
|
"white_switch",
|
|
)
|
|
self._register_knx_light_command(
|
|
port.white_brightness_address,
|
|
_get_event_type(CONF_LIGHT_WHITE_BRIGHTNESS_ADDRESS),
|
|
port,
|
|
"white_brightness",
|
|
)
|
|
|
|
if self._address_handlers:
|
|
if hasattr(event_helper, "async_track_event"):
|
|
self._knx_event_unsub = event_helper.async_track_event(
|
|
self.hass, "knx_event", self._handle_knx_event
|
|
)
|
|
else:
|
|
self._knx_event_unsub = self.hass.bus.async_listen(
|
|
"knx_event", self._handle_knx_event
|
|
)
|
|
|
|
await self._register_knx_events()
|
|
|
|
def _register_knx_address(
|
|
self,
|
|
address: str | None,
|
|
address_key: str,
|
|
port: CoverPort,
|
|
action: str,
|
|
) -> None:
|
|
if not address:
|
|
return
|
|
|
|
self._address_handlers[address] = lambda event: self._handle_cover_action(
|
|
port, action, event
|
|
)
|
|
value_type = _get_value_type(address_key)
|
|
event_type = _get_event_type(address_key)
|
|
self._remember_address_options(
|
|
address, value_type, False
|
|
)
|
|
self._registered_addresses.append((address, event_type))
|
|
|
|
def _register_knx_switch_address(
|
|
self,
|
|
address: str | None,
|
|
port: SwitchPort,
|
|
) -> None:
|
|
if not address:
|
|
return
|
|
self._address_handlers[address] = lambda event: self._handle_switch_action(
|
|
port, event
|
|
)
|
|
self._remember_address_options(
|
|
address,
|
|
_get_value_type(CONF_COMMAND_ADDRESS),
|
|
False,
|
|
)
|
|
self._registered_addresses.append((address, _get_event_type(CONF_COMMAND_ADDRESS)))
|
|
|
|
def _register_knx_light_command(
|
|
self,
|
|
address: str | None,
|
|
event_type: str | None,
|
|
port: LightPort,
|
|
action: str,
|
|
) -> None:
|
|
if not address:
|
|
return
|
|
self._address_handlers[address] = lambda event: self._handle_light_action(
|
|
port, action, event
|
|
)
|
|
self._registered_addresses.append((address, event_type))
|
|
|
|
def _remember_address_options(
|
|
self,
|
|
address: str,
|
|
value_type: str | None,
|
|
invert_outgoing: bool,
|
|
) -> None:
|
|
self._address_options[address] = AddressOptions(
|
|
value_type=value_type,
|
|
invert_outgoing=invert_outgoing,
|
|
)
|
|
|
|
async def _register_knx_events(self) -> None:
|
|
for address, event_type in self._registered_addresses:
|
|
data: dict[str, Any] = {"address": address}
|
|
if event_type:
|
|
data["type"] = event_type
|
|
await self.hass.services.async_call(
|
|
KNX_DOMAIN, "event_register", data, blocking=False
|
|
)
|
|
|
|
async def _unregister_knx_events(self) -> None:
|
|
for address, event_type in self._registered_addresses:
|
|
data: dict[str, Any] = {"address": address, "remove": True}
|
|
if event_type:
|
|
data["type"] = event_type
|
|
await self.hass.services.async_call(
|
|
KNX_DOMAIN, "event_register", data, blocking=False
|
|
)
|
|
self._registered_addresses.clear()
|
|
|
|
def _binary_sensor_changed(self, port: BinarySensorPort) -> callable[[Event], Any]:
|
|
async def _handler(event: Event) -> None:
|
|
new_state: State | None = event.data.get("new_state")
|
|
if new_state is None:
|
|
return
|
|
if new_state.state not in ("on", "off"):
|
|
return
|
|
payload = 1 if new_state.state == "on" else 0
|
|
payload = _invert_value(
|
|
payload, port.state_address, self._address_options
|
|
)
|
|
await self._knx_send_raw(port.state_address, payload)
|
|
|
|
return _handler
|
|
|
|
def _switch_changed(self, port: SwitchPort) -> callable[[Event], Any]:
|
|
async def _handler(event: Event) -> None:
|
|
new_state: State | None = event.data.get("new_state")
|
|
if new_state is None:
|
|
return
|
|
if new_state.state not in ("on", "off"):
|
|
return
|
|
payload = 1 if new_state.state == "on" else 0
|
|
payload = _invert_value(
|
|
payload, port.state_address, self._address_options
|
|
)
|
|
await self._knx_send_raw(port.state_address, payload)
|
|
|
|
return _handler
|
|
|
|
def _cover_changed(self, port: CoverPort) -> callable[[Event], Any]:
|
|
async def _handler(event: Event) -> None:
|
|
new_state: State | None = event.data.get("new_state")
|
|
if new_state is None:
|
|
return
|
|
|
|
if port.position_state_address is not None:
|
|
position = new_state.attributes.get("current_position")
|
|
if position is not None:
|
|
position = _invert_value(
|
|
position,
|
|
port.position_state_address,
|
|
self._address_options,
|
|
)
|
|
await self._knx_send_percent(
|
|
port.position_state_address, position
|
|
)
|
|
|
|
if port.angle_state_address is not None:
|
|
angle = new_state.attributes.get("current_tilt_position")
|
|
if angle is not None:
|
|
angle = _invert_value(
|
|
angle,
|
|
port.angle_state_address,
|
|
self._address_options,
|
|
)
|
|
await self._knx_send_percent(port.angle_state_address, angle)
|
|
|
|
return _handler
|
|
|
|
def _light_changed(self, port: LightPort) -> callable[[Event], Any]:
|
|
async def _handler(event: Event) -> None:
|
|
new_state: State | None = event.data.get("new_state")
|
|
if new_state is None:
|
|
return
|
|
if new_state.state not in ("on", "off"):
|
|
return
|
|
|
|
is_on = new_state.state == "on"
|
|
attrs = new_state.attributes
|
|
brightness = attrs.get(ATTR_BRIGHTNESS)
|
|
brightness_percent = (
|
|
_brightness_to_percent(brightness) if brightness is not None else None
|
|
)
|
|
hs_color = attrs.get(ATTR_HS_COLOR)
|
|
rgb_color = attrs.get(ATTR_RGB_COLOR)
|
|
rgbw_color = attrs.get(ATTR_RGBW_COLOR)
|
|
xy_color = attrs.get(ATTR_XY_COLOR)
|
|
color_temp_kelvin = attrs.get(ATTR_COLOR_TEMP_KELVIN)
|
|
color_temp_mired = attrs.get(ATTR_COLOR_TEMP)
|
|
|
|
if color_temp_kelvin is None and color_temp_mired is not None:
|
|
color_temp_kelvin = _mireds_to_kelvin(color_temp_mired)
|
|
|
|
if port.state_address:
|
|
payload = 1 if is_on else 0
|
|
payload = _invert_value(
|
|
payload, port.state_address, self._address_options
|
|
)
|
|
await self._knx_send_raw(port.state_address, payload)
|
|
|
|
if port.brightness_state_address:
|
|
if is_on:
|
|
if brightness_percent is not None:
|
|
payload = brightness_percent
|
|
payload = _invert_value(
|
|
payload,
|
|
port.brightness_state_address,
|
|
self._address_options,
|
|
)
|
|
await self._knx_send_percent(
|
|
port.brightness_state_address, payload
|
|
)
|
|
else:
|
|
payload = 0
|
|
payload = _invert_value(
|
|
payload,
|
|
port.brightness_state_address,
|
|
self._address_options,
|
|
)
|
|
await self._knx_send_percent(
|
|
port.brightness_state_address, payload
|
|
)
|
|
|
|
if rgb_color is None and hs_color is not None:
|
|
rgb_color = color_util.color_hs_to_RGB(*hs_color)
|
|
if rgb_color is None and xy_color is not None:
|
|
rgb_color = color_util.color_xy_to_RGB(
|
|
xy_color[0], xy_color[1], brightness or 255
|
|
)
|
|
|
|
if port.color_state_address and rgb_color is not None:
|
|
await self._knx_send_typed(
|
|
port.color_state_address, list(rgb_color), "232.600"
|
|
)
|
|
|
|
if port.rgbw_state_address and rgbw_color is not None:
|
|
await self._knx_send_typed(
|
|
port.rgbw_state_address, list(rgbw_color), "251.600"
|
|
)
|
|
|
|
if port.hue_state_address and hs_color is not None:
|
|
await self._knx_send_typed(
|
|
port.hue_state_address, int(round(hs_color[0])), "5.003"
|
|
)
|
|
|
|
if port.saturation_state_address and hs_color is not None:
|
|
payload = int(round(hs_color[1]))
|
|
payload = _invert_value(
|
|
payload, port.saturation_state_address, self._address_options
|
|
)
|
|
await self._knx_send_percent(
|
|
port.saturation_state_address, payload
|
|
)
|
|
|
|
if port.xyy_state_address and xy_color is not None:
|
|
if brightness_percent is None:
|
|
brightness_value = 100 if is_on else 0
|
|
else:
|
|
brightness_value = brightness_percent
|
|
await self._knx_send_typed(
|
|
port.xyy_state_address,
|
|
[float(xy_color[0]), float(xy_color[1]), int(brightness_value)],
|
|
"242.600",
|
|
)
|
|
|
|
if port.color_temperature_state_address and color_temp_kelvin is not None:
|
|
payload, dpt_type = _color_temperature_payload(
|
|
color_temp_kelvin,
|
|
port,
|
|
clamp=True,
|
|
)
|
|
if port.color_temperature_mode == "relative":
|
|
payload = _invert_value(
|
|
int(round(payload)),
|
|
port.color_temperature_state_address,
|
|
self._address_options,
|
|
)
|
|
await self._knx_send_typed(
|
|
port.color_temperature_state_address, payload, dpt_type
|
|
)
|
|
|
|
self._update_light_components_from_state(
|
|
port,
|
|
is_on,
|
|
brightness,
|
|
rgb_color,
|
|
rgbw_color,
|
|
)
|
|
|
|
await self._send_light_component_states(port)
|
|
|
|
return _handler
|
|
|
|
async def _handle_knx_event(self, event: Event) -> None:
|
|
direction = event.data.get("direction")
|
|
if direction is not None and not str(direction).lower().startswith("incoming"):
|
|
return
|
|
|
|
destination = _event_destination(event)
|
|
if not destination:
|
|
return
|
|
|
|
handler = self._address_handlers.get(destination)
|
|
if handler is None:
|
|
return
|
|
|
|
await handler(event)
|
|
|
|
async def _handle_cover_action(
|
|
self, port: CoverPort, action: str, event: Event
|
|
) -> None:
|
|
value = _extract_event_value(event)
|
|
if value is None:
|
|
return
|
|
destination = _event_destination(event)
|
|
value = _invert_value(value, destination, self._address_options)
|
|
|
|
if action == "move_long":
|
|
if value == 0:
|
|
await self._call_cover_service(port.entity_id, "open_cover")
|
|
elif value == 1:
|
|
await self._call_cover_service(port.entity_id, "close_cover")
|
|
elif action == "move_short":
|
|
if value in (0, 1):
|
|
await self._call_cover_service(port.entity_id, "stop_cover")
|
|
elif action == "stop":
|
|
if value in (0, 1):
|
|
await self._call_cover_service(port.entity_id, "stop_cover")
|
|
elif action == "position":
|
|
await self._call_cover_service(
|
|
port.entity_id, "set_cover_position", {"position": value}
|
|
)
|
|
elif action == "angle":
|
|
await self._call_cover_service(
|
|
port.entity_id, "set_cover_tilt_position", {"tilt_position": value}
|
|
)
|
|
|
|
async def _handle_switch_action(self, port: SwitchPort, event: Event) -> None:
|
|
value = _extract_event_value(event)
|
|
if value is None:
|
|
return
|
|
destination = _event_destination(event)
|
|
value = _invert_value(value, destination, self._address_options)
|
|
if value == 0:
|
|
await self._call_switch_service(port.entity_id, "turn_off")
|
|
elif value == 1:
|
|
await self._call_switch_service(port.entity_id, "turn_on")
|
|
|
|
async def _handle_light_action(
|
|
self, port: LightPort, action: str, event: Event
|
|
) -> None:
|
|
if action == "switch":
|
|
value = _extract_event_value(event)
|
|
if value is None:
|
|
return
|
|
if value == 0:
|
|
await self._call_light_service(port.entity_id, "turn_off")
|
|
elif value == 1:
|
|
await self._call_light_service(port.entity_id, "turn_on")
|
|
return
|
|
|
|
if action == "brightness":
|
|
value = _extract_event_value(event)
|
|
if value is None:
|
|
return
|
|
value = _clamp_percent(value)
|
|
if value == 0:
|
|
await self._call_light_service(port.entity_id, "turn_off")
|
|
else:
|
|
await self._call_light_service(
|
|
port.entity_id,
|
|
"turn_on",
|
|
{"brightness": _percent_to_brightness(value)},
|
|
)
|
|
return
|
|
|
|
if action == "color":
|
|
rgb = _extract_rgb_value(event)
|
|
if rgb is None:
|
|
return
|
|
await self._call_light_service(
|
|
port.entity_id, "turn_on", {"rgb_color": rgb}
|
|
)
|
|
return
|
|
|
|
if action == "rgbw":
|
|
rgbw = _extract_rgbw_value(event)
|
|
if rgbw is None:
|
|
return
|
|
await self._call_light_service(
|
|
port.entity_id, "turn_on", {"rgbw_color": rgbw}
|
|
)
|
|
return
|
|
|
|
if action == "hue":
|
|
value = _extract_event_value(event)
|
|
if value is None:
|
|
return
|
|
hs_color = _current_hs_color(self.hass, port.entity_id)
|
|
saturation = 0 if hs_color is None else hs_color[1]
|
|
await self._call_light_service(
|
|
port.entity_id,
|
|
"turn_on",
|
|
{"hs_color": (float(value), float(saturation))},
|
|
)
|
|
return
|
|
|
|
if action == "saturation":
|
|
value = _extract_event_value(event)
|
|
if value is None:
|
|
return
|
|
value = _clamp_percent(value)
|
|
hs_color = _current_hs_color(self.hass, port.entity_id)
|
|
hue = 0 if hs_color is None else hs_color[0]
|
|
await self._call_light_service(
|
|
port.entity_id,
|
|
"turn_on",
|
|
{"hs_color": (float(hue), float(value))},
|
|
)
|
|
return
|
|
|
|
if action == "xyy":
|
|
xyy = _extract_xyy_value(event)
|
|
if xyy is None:
|
|
return
|
|
x, y, luminance = xyy
|
|
data: dict[str, Any] = {"xy_color": (float(x), float(y))}
|
|
if luminance is not None:
|
|
lum = _clamp_percent(int(round(luminance)))
|
|
if lum == 0:
|
|
await self._call_light_service(port.entity_id, "turn_off")
|
|
return
|
|
data["brightness"] = _percent_to_brightness(lum)
|
|
await self._call_light_service(port.entity_id, "turn_on", data)
|
|
return
|
|
|
|
if action == "color_temperature":
|
|
value = _extract_float_value(event)
|
|
if value is None:
|
|
return
|
|
kelvin = _color_temperature_from_value(value, port)
|
|
if kelvin is None:
|
|
return
|
|
await self._call_light_service(
|
|
port.entity_id,
|
|
"turn_on",
|
|
{"color_temp": _kelvin_to_mireds(kelvin)},
|
|
)
|
|
return
|
|
|
|
if action == "relative_dimming":
|
|
value = _extract_event_value(event)
|
|
if value is None:
|
|
return
|
|
if value in (0, 8):
|
|
self._stop_light_dimming(port.entity_id)
|
|
return
|
|
step = _relative_dimming_step(value)
|
|
if step is None:
|
|
return
|
|
direction, percent = step
|
|
if percent <= 0:
|
|
return
|
|
self._start_light_dimming(port.entity_id, direction, percent)
|
|
return
|
|
|
|
if action == "relative_color_temperature":
|
|
value = _extract_event_value(event)
|
|
if value is None:
|
|
return
|
|
if value in (0, 8):
|
|
self._stop_light_ct_adjust(port.entity_id)
|
|
return
|
|
step = _relative_dimming_step(value)
|
|
if step is None:
|
|
return
|
|
direction, percent = step
|
|
if percent <= 0:
|
|
return
|
|
self._start_light_ct_adjust(port, direction, percent)
|
|
return
|
|
|
|
if action.startswith("red_"):
|
|
await self._handle_light_component_action(port, "red", action, event)
|
|
elif action.startswith("green_"):
|
|
await self._handle_light_component_action(port, "green", action, event)
|
|
elif action.startswith("blue_"):
|
|
await self._handle_light_component_action(port, "blue", action, event)
|
|
elif action.startswith("white_"):
|
|
await self._handle_light_component_action(port, "white", action, event)
|
|
|
|
async def _call_cover_service(
|
|
self, entity_id: str, service: str, service_data: dict[str, Any] | None = None
|
|
) -> None:
|
|
data = {"entity_id": entity_id}
|
|
if service_data:
|
|
data.update(service_data)
|
|
await self.hass.services.async_call(
|
|
"cover", service, data, blocking=False
|
|
)
|
|
|
|
async def _call_switch_service(
|
|
self, entity_id: str, service: str, service_data: dict[str, Any] | None = None
|
|
) -> None:
|
|
data = {"entity_id": entity_id}
|
|
if service_data:
|
|
data.update(service_data)
|
|
await self.hass.services.async_call(
|
|
"switch", service, data, blocking=False
|
|
)
|
|
|
|
async def _call_light_service(
|
|
self, entity_id: str, service: str, service_data: dict[str, Any] | None = None
|
|
) -> None:
|
|
data = {"entity_id": entity_id}
|
|
if service_data:
|
|
data.update(service_data)
|
|
await self.hass.services.async_call("light", service, data, blocking=False)
|
|
|
|
async def _knx_send_raw(self, address: str | None, payload: int) -> None:
|
|
if not address:
|
|
return
|
|
await self.hass.services.async_call(
|
|
KNX_DOMAIN,
|
|
"send",
|
|
{"address": address, "payload": payload},
|
|
blocking=False,
|
|
)
|
|
|
|
async def _knx_send_typed(
|
|
self, address: str | None, payload: Any, dpt_type: str
|
|
) -> None:
|
|
if not address:
|
|
return
|
|
await self.hass.services.async_call(
|
|
KNX_DOMAIN,
|
|
"send",
|
|
{"address": address, "payload": payload, "type": dpt_type},
|
|
blocking=False,
|
|
)
|
|
|
|
async def _knx_send_percent(self, address: str | None, value: int) -> None:
|
|
if not address:
|
|
return
|
|
value = _clamp_percent(value)
|
|
await self.hass.services.async_call(
|
|
KNX_DOMAIN,
|
|
"send",
|
|
{"address": address, "payload": value, "type": "percent"},
|
|
blocking=False,
|
|
)
|
|
|
|
def _update_light_components_from_state(
|
|
self,
|
|
port: LightPort,
|
|
is_on: bool,
|
|
brightness: int | None,
|
|
rgb_color: tuple[int, int, int] | None,
|
|
rgbw_color: tuple[int, int, int, int] | None,
|
|
) -> None:
|
|
if not is_on:
|
|
self._light_components[port.entity_id] = {"r": 0, "g": 0, "b": 0, "w": 0}
|
|
return
|
|
|
|
scale = 1.0
|
|
if brightness is not None:
|
|
scale = max(0.0, min(1.0, brightness / 255))
|
|
|
|
components = self._light_components.get(port.entity_id, {})
|
|
if rgbw_color is not None:
|
|
components = {
|
|
"r": int(round(rgbw_color[0] * scale)),
|
|
"g": int(round(rgbw_color[1] * scale)),
|
|
"b": int(round(rgbw_color[2] * scale)),
|
|
"w": int(round(rgbw_color[3] * scale)),
|
|
}
|
|
elif rgb_color is not None:
|
|
components = {
|
|
"r": int(round(rgb_color[0] * scale)),
|
|
"g": int(round(rgb_color[1] * scale)),
|
|
"b": int(round(rgb_color[2] * scale)),
|
|
"w": components.get("w", 0),
|
|
}
|
|
self._light_components[port.entity_id] = components
|
|
|
|
async def _send_light_component_states(self, port: LightPort) -> None:
|
|
components = self._light_components.get(port.entity_id)
|
|
if not components:
|
|
return
|
|
|
|
await self._send_light_component_state(
|
|
port.red_state_address,
|
|
port.red_state_invert_outgoing,
|
|
components.get("r", 0),
|
|
)
|
|
await self._send_light_component_brightness_state(
|
|
port.red_brightness_state_address,
|
|
port.red_brightness_state_invert_outgoing,
|
|
components.get("r", 0),
|
|
)
|
|
await self._send_light_component_state(
|
|
port.green_state_address,
|
|
port.green_state_invert_outgoing,
|
|
components.get("g", 0),
|
|
)
|
|
await self._send_light_component_brightness_state(
|
|
port.green_brightness_state_address,
|
|
port.green_brightness_state_invert_outgoing,
|
|
components.get("g", 0),
|
|
)
|
|
await self._send_light_component_state(
|
|
port.blue_state_address,
|
|
port.blue_state_invert_outgoing,
|
|
components.get("b", 0),
|
|
)
|
|
await self._send_light_component_brightness_state(
|
|
port.blue_brightness_state_address,
|
|
port.blue_brightness_state_invert_outgoing,
|
|
components.get("b", 0),
|
|
)
|
|
await self._send_light_component_state(
|
|
port.white_state_address,
|
|
port.white_state_invert_outgoing,
|
|
components.get("w", 0),
|
|
)
|
|
await self._send_light_component_brightness_state(
|
|
port.white_brightness_state_address,
|
|
port.white_brightness_state_invert_outgoing,
|
|
components.get("w", 0),
|
|
)
|
|
|
|
async def _send_light_component_state(
|
|
self,
|
|
address: str | None,
|
|
invert_outgoing: bool,
|
|
value: int,
|
|
) -> None:
|
|
if not address:
|
|
return
|
|
payload = 1 if value > 0 else 0
|
|
if invert_outgoing:
|
|
payload = _invert_value(payload, address, self._address_options)
|
|
await self._knx_send_raw(address, payload)
|
|
|
|
async def _send_light_component_brightness_state(
|
|
self,
|
|
address: str | None,
|
|
invert_outgoing: bool,
|
|
value: int,
|
|
) -> None:
|
|
if not address:
|
|
return
|
|
payload = _brightness_to_percent(value)
|
|
if invert_outgoing:
|
|
payload = _invert_value(payload, address, self._address_options)
|
|
await self._knx_send_percent(address, payload)
|
|
|
|
async def _handle_light_component_action(
|
|
self, port: LightPort, color: str, action: str, event: Event
|
|
) -> None:
|
|
components = self._light_components.setdefault(
|
|
port.entity_id, {"r": 0, "g": 0, "b": 0, "w": 0}
|
|
)
|
|
if action.endswith("_switch"):
|
|
value = _extract_event_value(event)
|
|
if value is None:
|
|
return
|
|
if value == 0:
|
|
components[color[0]] = 0
|
|
elif value == 1 and components[color[0]] == 0:
|
|
components[color[0]] = 255
|
|
elif action.endswith("_brightness"):
|
|
value = _extract_event_value(event)
|
|
if value is None:
|
|
return
|
|
value = _clamp_percent(value)
|
|
components[color[0]] = _percent_to_brightness(value)
|
|
else:
|
|
return
|
|
|
|
if all(value == 0 for value in components.values()):
|
|
await self._call_light_service(port.entity_id, "turn_off")
|
|
return
|
|
|
|
if _light_uses_white_channel(port):
|
|
await self._call_light_service(
|
|
port.entity_id,
|
|
"turn_on",
|
|
{
|
|
"rgbw_color": (
|
|
components["r"],
|
|
components["g"],
|
|
components["b"],
|
|
components["w"],
|
|
)
|
|
},
|
|
)
|
|
else:
|
|
await self._call_light_service(
|
|
port.entity_id,
|
|
"turn_on",
|
|
{
|
|
"rgb_color": (
|
|
components["r"],
|
|
components["g"],
|
|
components["b"],
|
|
)
|
|
},
|
|
)
|
|
|
|
def _start_light_dimming(
|
|
self, entity_id: str, direction: str, percent: int
|
|
) -> None:
|
|
self._stop_light_dimming(entity_id)
|
|
|
|
async def _runner() -> None:
|
|
while True:
|
|
step = percent if direction == "up" else -percent
|
|
await self._call_light_service(
|
|
entity_id, "turn_on", {"brightness_step_pct": step}
|
|
)
|
|
await asyncio.sleep(0.3)
|
|
|
|
self._light_dimming_tasks[entity_id] = self.hass.async_create_task(
|
|
_runner()
|
|
)
|
|
|
|
def _stop_light_dimming(self, entity_id: str) -> None:
|
|
task = self._light_dimming_tasks.pop(entity_id, None)
|
|
if task is not None:
|
|
task.cancel()
|
|
|
|
def _start_light_ct_adjust(
|
|
self, port: LightPort, direction: str, percent: int
|
|
) -> None:
|
|
self._stop_light_ct_adjust(port.entity_id)
|
|
|
|
async def _runner() -> None:
|
|
while True:
|
|
current_kelvin = _current_color_temp_kelvin(self.hass, port)
|
|
if current_kelvin is None:
|
|
current_kelvin = _light_min_kelvin(port)
|
|
delta = (_light_max_kelvin(port) - _light_min_kelvin(port)) * (
|
|
percent / 100
|
|
)
|
|
if direction == "up":
|
|
next_kelvin = current_kelvin + delta
|
|
else:
|
|
next_kelvin = current_kelvin - delta
|
|
next_kelvin = min(
|
|
max(next_kelvin, _light_min_kelvin(port)),
|
|
_light_max_kelvin(port),
|
|
)
|
|
await self._call_light_service(
|
|
port.entity_id,
|
|
"turn_on",
|
|
{"color_temp": _kelvin_to_mireds(next_kelvin)},
|
|
)
|
|
await asyncio.sleep(0.3)
|
|
|
|
self._light_ct_tasks[port.entity_id] = self.hass.async_create_task(
|
|
_runner()
|
|
)
|
|
|
|
def _stop_light_ct_adjust(self, entity_id: str) -> None:
|
|
task = self._light_ct_tasks.pop(entity_id, None)
|
|
if task is not None:
|
|
task.cancel()
|
|
|
|
|
|
def _extract_event_value(event: Event) -> int | None:
|
|
if "value" in event.data:
|
|
value = event.data["value"]
|
|
mapped = _map_scalar_value(value)
|
|
if mapped is not None:
|
|
return mapped
|
|
data = event.data.get("data")
|
|
if data is None:
|
|
return None
|
|
if isinstance(data, list) and data:
|
|
data = data[0]
|
|
mapped = _map_scalar_value(data)
|
|
if mapped is not None:
|
|
return mapped
|
|
try:
|
|
return int(data) & 1
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _clean_address(address: Any) -> str | None:
|
|
if not address:
|
|
return None
|
|
if isinstance(address, str):
|
|
stripped = address.strip()
|
|
return stripped or None
|
|
return None
|
|
|
|
|
|
def _clean_bool(value: Any) -> bool:
|
|
return bool(value)
|
|
|
|
|
|
def _clean_int(value: Any) -> int | None:
|
|
if value is None or value == "":
|
|
return None
|
|
try:
|
|
return int(value)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _invert_value(
|
|
value: int,
|
|
address: str | None,
|
|
address_options: dict[str, AddressOptions],
|
|
) -> int:
|
|
if address is None:
|
|
return value
|
|
options = address_options.get(address)
|
|
if options is None:
|
|
return value
|
|
if options.value_type == "percent":
|
|
value = _clamp_percent(value)
|
|
if not options.invert_outgoing:
|
|
return value
|
|
if options.value_type == "percent":
|
|
return 100 - value
|
|
if value not in (0, 1):
|
|
return value
|
|
return 0 if value == 1 else 1
|
|
|
|
|
|
def _invert_out_key(address_key: str) -> str:
|
|
return f"{address_key}_{CONF_INVERT_OUTGOING}"
|
|
|
|
|
|
def _get_value_type(address_key: str) -> str | None:
|
|
return ADDRESS_VALUE_TYPE.get(address_key)
|
|
|
|
|
|
def _get_event_type(address_key: str) -> str | None:
|
|
return ADDRESS_EVENT_TYPE.get(address_key)
|
|
|
|
|
|
def _clamp_percent(value: int) -> int:
|
|
if value < 0:
|
|
return 0
|
|
if value > 100:
|
|
return 100
|
|
return value
|
|
|
|
|
|
def _brightness_to_percent(brightness: int) -> int:
|
|
if brightness <= 0:
|
|
return 0
|
|
if brightness >= 255:
|
|
return 100
|
|
return int(round(brightness / 255 * 100))
|
|
|
|
|
|
def _percent_to_brightness(percent: int) -> int:
|
|
percent = _clamp_percent(percent)
|
|
return int(round(percent / 100 * 255))
|
|
|
|
|
|
def _kelvin_to_mireds(kelvin: float) -> int:
|
|
if kelvin <= 0:
|
|
return 0
|
|
return int(round(1000000 / kelvin))
|
|
|
|
|
|
def _mireds_to_kelvin(mireds: float) -> float:
|
|
if mireds <= 0:
|
|
return 0
|
|
return 1000000 / mireds
|
|
|
|
|
|
def _extract_rgb_value(event: Event) -> tuple[int, int, int] | None:
|
|
value = event.data.get("value")
|
|
if isinstance(value, dict):
|
|
red = value.get("red", value.get("r"))
|
|
green = value.get("green", value.get("g"))
|
|
blue = value.get("blue", value.get("b"))
|
|
if red is not None and green is not None and blue is not None:
|
|
return (int(red), int(green), int(blue))
|
|
if isinstance(value, (list, tuple)) and len(value) >= 3:
|
|
return (int(value[0]), int(value[1]), int(value[2]))
|
|
data = event.data.get("data")
|
|
if isinstance(data, (list, tuple)) and len(data) >= 3:
|
|
return (int(data[0]), int(data[1]), int(data[2]))
|
|
return None
|
|
|
|
|
|
def _extract_rgbw_value(event: Event) -> tuple[int, int, int, int] | None:
|
|
value = event.data.get("value")
|
|
if isinstance(value, dict):
|
|
red = value.get("red", value.get("r"))
|
|
green = value.get("green", value.get("g"))
|
|
blue = value.get("blue", value.get("b"))
|
|
white = value.get("white", value.get("w"))
|
|
if (
|
|
red is not None
|
|
and green is not None
|
|
and blue is not None
|
|
and white is not None
|
|
):
|
|
return (int(red), int(green), int(blue), int(white))
|
|
if isinstance(value, (list, tuple)) and len(value) >= 4:
|
|
return (int(value[0]), int(value[1]), int(value[2]), int(value[3]))
|
|
data = event.data.get("data")
|
|
if isinstance(data, (list, tuple)) and len(data) >= 4:
|
|
return (int(data[0]), int(data[1]), int(data[2]), int(data[3]))
|
|
return None
|
|
|
|
|
|
def _extract_xyy_value(
|
|
event: Event,
|
|
) -> tuple[float, float, float | None] | None:
|
|
value = event.data.get("value")
|
|
if isinstance(value, dict):
|
|
x = value.get("x")
|
|
y = value.get("y")
|
|
luminance = value.get("Y", value.get("luminance", value.get("brightness")))
|
|
if x is not None and y is not None:
|
|
return (
|
|
float(x),
|
|
float(y),
|
|
float(luminance) if luminance is not None else None,
|
|
)
|
|
if isinstance(value, (list, tuple)) and len(value) >= 2:
|
|
x = value[0]
|
|
y = value[1]
|
|
luminance = value[2] if len(value) > 2 else None
|
|
return (
|
|
float(x),
|
|
float(y),
|
|
float(luminance) if luminance is not None else None,
|
|
)
|
|
data = event.data.get("data")
|
|
if isinstance(data, (list, tuple)) and len(data) >= 2:
|
|
x = data[0]
|
|
y = data[1]
|
|
luminance = data[2] if len(data) > 2 else None
|
|
return (
|
|
float(x),
|
|
float(y),
|
|
float(luminance) if luminance is not None else None,
|
|
)
|
|
return None
|
|
|
|
|
|
def _extract_float_value(event: Event) -> float | None:
|
|
if "value" in event.data:
|
|
value = event.data["value"]
|
|
if isinstance(value, (int, float)):
|
|
return float(value)
|
|
mapped = _extract_event_value(event)
|
|
if mapped is None:
|
|
return None
|
|
return float(mapped)
|
|
|
|
|
|
def _current_hs_color(
|
|
hass: HomeAssistant, entity_id: str
|
|
) -> tuple[float, float] | None:
|
|
state = hass.states.get(entity_id)
|
|
if state is None:
|
|
return None
|
|
hs_color = state.attributes.get(ATTR_HS_COLOR)
|
|
if hs_color is None:
|
|
return None
|
|
try:
|
|
return float(hs_color[0]), float(hs_color[1])
|
|
except (TypeError, ValueError, IndexError):
|
|
return None
|
|
|
|
|
|
def _current_color_temp_kelvin(
|
|
hass: HomeAssistant, port: LightPort
|
|
) -> float | None:
|
|
state = hass.states.get(port.entity_id)
|
|
if state is None:
|
|
return None
|
|
if ATTR_COLOR_TEMP_KELVIN in state.attributes:
|
|
try:
|
|
return float(state.attributes[ATTR_COLOR_TEMP_KELVIN])
|
|
except (TypeError, ValueError):
|
|
return None
|
|
mireds = state.attributes.get(ATTR_COLOR_TEMP)
|
|
if mireds is None:
|
|
return None
|
|
try:
|
|
return _mireds_to_kelvin(float(mireds))
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _light_color_temperature_event_type(mode: str) -> str | None:
|
|
if mode == "relative":
|
|
return "percent"
|
|
if mode == "absolute":
|
|
return "2byte_unsigned"
|
|
if mode == "absolute_float":
|
|
return "2byte_float"
|
|
return None
|
|
|
|
|
|
def _color_temperature_payload(
|
|
kelvin: float, port: LightPort, clamp: bool = False
|
|
) -> tuple[float | int, str]:
|
|
min_kelvin = _light_min_kelvin(port)
|
|
max_kelvin = _light_max_kelvin(port)
|
|
if clamp:
|
|
kelvin = min(max(kelvin, min_kelvin), max_kelvin)
|
|
if port.color_temperature_mode == "relative":
|
|
percent = (kelvin - min_kelvin) / (max_kelvin - min_kelvin) * 100
|
|
return int(round(_clamp_percent(int(round(percent))))), "percent"
|
|
if port.color_temperature_mode == "absolute_float":
|
|
return float(kelvin), "2byte_float"
|
|
return int(round(kelvin)), "2byte_unsigned"
|
|
|
|
|
|
def _color_temperature_from_value(
|
|
value: float, port: LightPort
|
|
) -> float | None:
|
|
min_kelvin = _light_min_kelvin(port)
|
|
max_kelvin = _light_max_kelvin(port)
|
|
if port.color_temperature_mode == "relative":
|
|
percent = _clamp_percent(int(round(value)))
|
|
return min_kelvin + (max_kelvin - min_kelvin) * (percent / 100)
|
|
if port.color_temperature_mode == "absolute":
|
|
return float(value)
|
|
if port.color_temperature_mode == "absolute_float":
|
|
return float(value)
|
|
return None
|
|
|
|
|
|
def _light_min_kelvin(port: LightPort) -> int:
|
|
return port.min_kelvin or 2000
|
|
|
|
|
|
def _light_max_kelvin(port: LightPort) -> int:
|
|
return port.max_kelvin or 6500
|
|
|
|
|
|
def _light_uses_white_channel(port: LightPort) -> bool:
|
|
return any(
|
|
[
|
|
port.white_address,
|
|
port.white_state_address,
|
|
port.white_brightness_address,
|
|
port.white_brightness_state_address,
|
|
]
|
|
)
|
|
|
|
|
|
def _relative_dimming_step(value: int) -> tuple[str, int] | None:
|
|
value = value & 0x0F
|
|
if value == 0 or value == 8:
|
|
return None
|
|
if value <= 7:
|
|
direction = "down"
|
|
step = value
|
|
else:
|
|
direction = "up"
|
|
step = value - 8
|
|
percent_map = {
|
|
1: 10,
|
|
2: 8,
|
|
3: 6,
|
|
4: 4,
|
|
5: 3,
|
|
6: 2,
|
|
7: 1,
|
|
}
|
|
percent = percent_map.get(step)
|
|
if percent is None:
|
|
return None
|
|
return direction, percent
|
|
|
|
|
|
def _map_scalar_value(value: Any) -> int | None:
|
|
if isinstance(value, bool):
|
|
return 1 if value else 0
|
|
if isinstance(value, (int, float)):
|
|
return int(value)
|
|
if isinstance(value, str):
|
|
text = value.strip().lower()
|
|
if text in ("on", "true", "yes", "1", "down", "close", "closed"):
|
|
return 1
|
|
if text in ("off", "false", "no", "0", "up", "open", "opened"):
|
|
return 0
|
|
try:
|
|
return int(text)
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
|
|
def _event_destination(event: Event) -> str | None:
|
|
return (
|
|
event.data.get("destination")
|
|
or event.data.get("destination_address")
|
|
or event.data.get("address")
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class PortConfig:
|
|
port_id: str
|
|
port_type: str
|
|
title: str
|
|
data: dict[str, Any]
|
|
enabled: bool | None
|
|
|
|
|
|
def _iter_port_configs(entry: ConfigEntry) -> list[PortConfig]:
|
|
ports: list[PortConfig] = []
|
|
subentries = getattr(entry, "subentries", [])
|
|
for subentry in subentries:
|
|
ports.append(
|
|
PortConfig(
|
|
port_id=subentry.entry_id,
|
|
port_type=subentry.type,
|
|
title=subentry.title or subentry.entry_id,
|
|
data=subentry.data,
|
|
enabled=subentry.data.get(CONF_ENABLED),
|
|
)
|
|
)
|
|
option_ports = entry.options.get(CONF_PORTS, [])
|
|
for port in option_ports:
|
|
port_type = port.get("type")
|
|
data = port.get("data", {})
|
|
if port_type and isinstance(data, dict):
|
|
ports.append(
|
|
PortConfig(
|
|
port_id=port.get("id", ""),
|
|
port_type=port_type,
|
|
title=port.get("title", port.get("id", "Port")),
|
|
data=data,
|
|
enabled=port.get("enabled"),
|
|
)
|
|
)
|
|
return ports
|
|
|
|
|
|
def _is_port_enabled(port: PortConfig, overrides: dict[str, Any]) -> bool:
|
|
if port.port_id and port.port_id in overrides:
|
|
return bool(overrides[port.port_id])
|
|
if port.enabled is None:
|
|
return True
|
|
return bool(port.enabled)
|
|
|
|
|
|
def iter_ports(entry: ConfigEntry) -> list[PortConfig]:
|
|
return _iter_port_configs(entry)
|