mirror of
https://github.com/bahmcloud/HA-KNX-Bridge.git
synced 2026-04-06 14:31:13 +00:00
626 lines
22 KiB
Python
626 lines
22 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
import logging
|
|
from typing import Any
|
|
|
|
from homeassistant.config_entries import ConfigEntry
|
|
from homeassistant.core import Event, HomeAssistant, State
|
|
from homeassistant.exceptions import ConfigEntryNotReady
|
|
from homeassistant.helpers import event as event_helper
|
|
|
|
from .const import (
|
|
CONF_ANGLE_ADDRESS,
|
|
CONF_ANGLE_STATE_ADDRESS,
|
|
CONF_COMMAND_ADDRESS,
|
|
CONF_INVERT_INCOMING,
|
|
CONF_INVERT_OUTGOING,
|
|
CONF_MOVE_LONG_ADDRESS,
|
|
CONF_MOVE_SHORT_ADDRESS,
|
|
CONF_POSITION_ADDRESS,
|
|
CONF_POSITION_STATE_ADDRESS,
|
|
CONF_STATE_ADDRESS,
|
|
CONF_STOP_ADDRESS,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
KNX_DOMAIN = "knx"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class BinarySensorPort:
|
|
entity_id: str
|
|
state_address: str | None
|
|
state_invert_incoming: bool
|
|
state_invert_outgoing: bool
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CoverPort:
|
|
entity_id: str
|
|
move_long_address: str | None
|
|
move_long_invert_incoming: bool
|
|
move_long_invert_outgoing: bool
|
|
move_short_address: str | None
|
|
move_short_invert_incoming: bool
|
|
move_short_invert_outgoing: bool
|
|
stop_address: str | None
|
|
stop_invert_incoming: bool
|
|
stop_invert_outgoing: bool
|
|
position_address: str | None
|
|
position_invert_incoming: bool
|
|
position_invert_outgoing: bool
|
|
position_state_address: str | None
|
|
position_state_invert_incoming: bool
|
|
position_state_invert_outgoing: bool
|
|
angle_address: str | None
|
|
angle_invert_incoming: bool
|
|
angle_invert_outgoing: bool
|
|
angle_state_address: str | None
|
|
angle_state_invert_incoming: bool
|
|
angle_state_invert_outgoing: bool
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SwitchPort:
|
|
entity_id: str
|
|
command_address: str | None
|
|
state_address: str | None
|
|
command_invert_incoming: bool
|
|
command_invert_outgoing: bool
|
|
state_invert_incoming: bool
|
|
state_invert_outgoing: bool
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AddressOptions:
|
|
value_type: str | None
|
|
invert_incoming: bool
|
|
invert_outgoing: bool
|
|
|
|
|
|
class BridgeManager:
|
|
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
|
|
self.hass = hass
|
|
self.entry = entry
|
|
self._unsub_listeners: list[callable] = []
|
|
self._knx_event_unsub: callable | None = None
|
|
self._address_handlers: dict[str, callable[[Event], Any]] = {}
|
|
self._registered_addresses: list[tuple[str, str | None]] = []
|
|
self._address_options: dict[str, AddressOptions] = {}
|
|
|
|
async def async_setup(self) -> None:
|
|
if not self.hass.services.has_service(KNX_DOMAIN, "send"):
|
|
raise ConfigEntryNotReady("KNX integration services not available")
|
|
|
|
binary_ports, switch_ports, cover_ports = self._load_ports()
|
|
self._register_outgoing(binary_ports, switch_ports, cover_ports)
|
|
await self._register_incoming(switch_ports, cover_ports)
|
|
|
|
async def async_unload(self) -> None:
|
|
for unsub in self._unsub_listeners:
|
|
unsub()
|
|
self._unsub_listeners.clear()
|
|
|
|
if self._knx_event_unsub is not None:
|
|
self._knx_event_unsub()
|
|
self._knx_event_unsub = None
|
|
|
|
await self._unregister_knx_events()
|
|
|
|
def _load_ports(
|
|
self,
|
|
) -> tuple[list[BinarySensorPort], list[SwitchPort], list[CoverPort]]:
|
|
binary_ports: list[BinarySensorPort] = []
|
|
switch_ports: list[SwitchPort] = []
|
|
cover_ports: list[CoverPort] = []
|
|
|
|
subentries = getattr(self.entry, "subentries", [])
|
|
for subentry in subentries:
|
|
data = subentry.data
|
|
if subentry.type == "binary_sensor":
|
|
binary_ports.append(
|
|
BinarySensorPort(
|
|
entity_id=data["entity_id"],
|
|
state_address=_clean_address(data.get(CONF_STATE_ADDRESS)),
|
|
state_invert_incoming=_clean_bool(
|
|
data.get(_invert_in_key(CONF_STATE_ADDRESS))
|
|
),
|
|
state_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_STATE_ADDRESS))
|
|
),
|
|
)
|
|
)
|
|
elif subentry.type == "switch":
|
|
switch_ports.append(
|
|
SwitchPort(
|
|
entity_id=data["entity_id"],
|
|
command_address=_clean_address(
|
|
data.get(CONF_COMMAND_ADDRESS)
|
|
),
|
|
state_address=_clean_address(data.get(CONF_STATE_ADDRESS)),
|
|
command_invert_incoming=_clean_bool(
|
|
data.get(_invert_in_key(CONF_COMMAND_ADDRESS))
|
|
),
|
|
command_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_COMMAND_ADDRESS))
|
|
),
|
|
state_invert_incoming=_clean_bool(
|
|
data.get(_invert_in_key(CONF_STATE_ADDRESS))
|
|
),
|
|
state_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_STATE_ADDRESS))
|
|
),
|
|
)
|
|
)
|
|
elif subentry.type == "cover":
|
|
cover_ports.append(
|
|
CoverPort(
|
|
entity_id=data["entity_id"],
|
|
move_long_address=_clean_address(data.get(CONF_MOVE_LONG_ADDRESS)),
|
|
move_long_invert_incoming=_clean_bool(
|
|
data.get(_invert_in_key(CONF_MOVE_LONG_ADDRESS))
|
|
),
|
|
move_long_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_MOVE_LONG_ADDRESS))
|
|
),
|
|
move_short_address=_clean_address(data.get(CONF_MOVE_SHORT_ADDRESS)),
|
|
move_short_invert_incoming=_clean_bool(
|
|
data.get(_invert_in_key(CONF_MOVE_SHORT_ADDRESS))
|
|
),
|
|
move_short_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_MOVE_SHORT_ADDRESS))
|
|
),
|
|
stop_address=_clean_address(data.get(CONF_STOP_ADDRESS)),
|
|
stop_invert_incoming=_clean_bool(
|
|
data.get(_invert_in_key(CONF_STOP_ADDRESS))
|
|
),
|
|
stop_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_STOP_ADDRESS))
|
|
),
|
|
position_address=_clean_address(data.get(CONF_POSITION_ADDRESS)),
|
|
position_invert_incoming=_clean_bool(
|
|
data.get(_invert_in_key(CONF_POSITION_ADDRESS))
|
|
),
|
|
position_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_POSITION_ADDRESS))
|
|
),
|
|
position_state_address=_clean_address(
|
|
data.get(CONF_POSITION_STATE_ADDRESS)
|
|
),
|
|
position_state_invert_incoming=_clean_bool(
|
|
data.get(_invert_in_key(CONF_POSITION_STATE_ADDRESS))
|
|
),
|
|
position_state_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_POSITION_STATE_ADDRESS))
|
|
),
|
|
angle_address=_clean_address(data.get(CONF_ANGLE_ADDRESS)),
|
|
angle_invert_incoming=_clean_bool(
|
|
data.get(_invert_in_key(CONF_ANGLE_ADDRESS))
|
|
),
|
|
angle_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_ANGLE_ADDRESS))
|
|
),
|
|
angle_state_address=_clean_address(
|
|
data.get(CONF_ANGLE_STATE_ADDRESS)
|
|
),
|
|
angle_state_invert_incoming=_clean_bool(
|
|
data.get(_invert_in_key(CONF_ANGLE_STATE_ADDRESS))
|
|
),
|
|
angle_state_invert_outgoing=_clean_bool(
|
|
data.get(_invert_out_key(CONF_ANGLE_STATE_ADDRESS))
|
|
),
|
|
)
|
|
)
|
|
|
|
return binary_ports, switch_ports, cover_ports
|
|
|
|
def _register_outgoing(
|
|
self,
|
|
binary_ports: list[BinarySensorPort],
|
|
switch_ports: list[SwitchPort],
|
|
cover_ports: list[CoverPort],
|
|
) -> None:
|
|
for port in binary_ports:
|
|
if not port.state_address:
|
|
continue
|
|
self._remember_address_options(
|
|
port.state_address,
|
|
None,
|
|
port.state_invert_incoming,
|
|
port.state_invert_outgoing,
|
|
)
|
|
self._unsub_listeners.append(
|
|
event_helper.async_track_state_change_event(
|
|
self.hass, [port.entity_id], self._binary_sensor_changed(port)
|
|
)
|
|
)
|
|
|
|
for port in switch_ports:
|
|
if not port.state_address:
|
|
continue
|
|
self._remember_address_options(
|
|
port.state_address,
|
|
None,
|
|
port.state_invert_incoming,
|
|
port.state_invert_outgoing,
|
|
)
|
|
self._unsub_listeners.append(
|
|
event_helper.async_track_state_change_event(
|
|
self.hass, [port.entity_id], self._switch_changed(port)
|
|
)
|
|
)
|
|
|
|
for port in cover_ports:
|
|
if not (port.position_state_address or port.angle_state_address):
|
|
continue
|
|
if port.position_state_address:
|
|
self._remember_address_options(
|
|
port.position_state_address,
|
|
"percent",
|
|
port.position_state_invert_incoming,
|
|
port.position_state_invert_outgoing,
|
|
)
|
|
if port.angle_state_address:
|
|
self._remember_address_options(
|
|
port.angle_state_address,
|
|
"percent",
|
|
port.angle_state_invert_incoming,
|
|
port.angle_state_invert_outgoing,
|
|
)
|
|
self._unsub_listeners.append(
|
|
event_helper.async_track_state_change_event(
|
|
self.hass, [port.entity_id], self._cover_changed(port)
|
|
)
|
|
)
|
|
|
|
async def _register_incoming(
|
|
self, switch_ports: list[SwitchPort], cover_ports: list[CoverPort]
|
|
) -> None:
|
|
for port in switch_ports:
|
|
self._register_knx_switch_address(
|
|
port.command_address,
|
|
port.command_invert_incoming,
|
|
port.command_invert_outgoing,
|
|
port,
|
|
)
|
|
|
|
for port in cover_ports:
|
|
self._register_knx_address(
|
|
port.move_long_address,
|
|
None,
|
|
port.move_long_invert_incoming,
|
|
port.move_long_invert_outgoing,
|
|
port,
|
|
"move_long",
|
|
)
|
|
self._register_knx_address(
|
|
port.move_short_address,
|
|
None,
|
|
port.move_short_invert_incoming,
|
|
port.move_short_invert_outgoing,
|
|
port,
|
|
"move_short",
|
|
)
|
|
self._register_knx_address(
|
|
port.stop_address,
|
|
None,
|
|
port.stop_invert_incoming,
|
|
port.stop_invert_outgoing,
|
|
port,
|
|
"stop",
|
|
)
|
|
self._register_knx_address(
|
|
port.position_address,
|
|
"percent",
|
|
port.position_invert_incoming,
|
|
port.position_invert_outgoing,
|
|
port,
|
|
"position",
|
|
)
|
|
self._register_knx_address(
|
|
port.angle_address,
|
|
"percent",
|
|
port.angle_invert_incoming,
|
|
port.angle_invert_outgoing,
|
|
port,
|
|
"angle",
|
|
)
|
|
|
|
if self._address_handlers:
|
|
self._knx_event_unsub = event_helper.async_track_event(
|
|
self.hass, "knx_event", self._handle_knx_event
|
|
)
|
|
|
|
await self._register_knx_events()
|
|
|
|
def _register_knx_address(
|
|
self,
|
|
address: str | None,
|
|
value_type: str | None,
|
|
invert_incoming: bool,
|
|
invert_outgoing: bool,
|
|
port: CoverPort,
|
|
action: str,
|
|
) -> None:
|
|
if not address:
|
|
return
|
|
|
|
self._address_handlers[address] = lambda event: self._handle_cover_action(
|
|
port, action, event
|
|
)
|
|
self._remember_address_options(
|
|
address, value_type, invert_incoming, invert_outgoing
|
|
)
|
|
self._registered_addresses.append((address, value_type))
|
|
|
|
def _register_knx_switch_address(
|
|
self,
|
|
address: str | None,
|
|
invert_incoming: bool,
|
|
invert_outgoing: bool,
|
|
port: SwitchPort,
|
|
) -> None:
|
|
if not address:
|
|
return
|
|
self._address_handlers[address] = lambda event: self._handle_switch_action(
|
|
port, event
|
|
)
|
|
self._remember_address_options(
|
|
address, None, invert_incoming, invert_outgoing
|
|
)
|
|
self._registered_addresses.append((address, None))
|
|
|
|
def _remember_address_options(
|
|
self,
|
|
address: str,
|
|
value_type: str | None,
|
|
invert_incoming: bool,
|
|
invert_outgoing: bool,
|
|
) -> None:
|
|
self._address_options[address] = AddressOptions(
|
|
value_type=value_type,
|
|
invert_incoming=invert_incoming,
|
|
invert_outgoing=invert_outgoing,
|
|
)
|
|
|
|
async def _register_knx_events(self) -> None:
|
|
for address, value_type in self._registered_addresses:
|
|
data: dict[str, Any] = {"address": address}
|
|
if value_type:
|
|
data["type"] = value_type
|
|
await self.hass.services.async_call(
|
|
KNX_DOMAIN, "event_register", data, blocking=False
|
|
)
|
|
|
|
async def _unregister_knx_events(self) -> None:
|
|
for address, value_type in self._registered_addresses:
|
|
data: dict[str, Any] = {"address": address, "remove": True}
|
|
if value_type:
|
|
data["type"] = value_type
|
|
await self.hass.services.async_call(
|
|
KNX_DOMAIN, "event_register", data, blocking=False
|
|
)
|
|
self._registered_addresses.clear()
|
|
|
|
def _binary_sensor_changed(self, port: BinarySensorPort) -> callable[[Event], Any]:
|
|
async def _handler(event: Event) -> None:
|
|
new_state: State | None = event.data.get("new_state")
|
|
if new_state is None:
|
|
return
|
|
if new_state.state not in ("on", "off"):
|
|
return
|
|
payload = 1 if new_state.state == "on" else 0
|
|
payload = _invert_value(
|
|
payload, port.state_address, self._address_options, "outgoing"
|
|
)
|
|
await self._knx_send_raw(port.state_address, payload)
|
|
|
|
return _handler
|
|
|
|
def _switch_changed(self, port: SwitchPort) -> callable[[Event], Any]:
|
|
async def _handler(event: Event) -> None:
|
|
new_state: State | None = event.data.get("new_state")
|
|
if new_state is None:
|
|
return
|
|
if new_state.state not in ("on", "off"):
|
|
return
|
|
payload = 1 if new_state.state == "on" else 0
|
|
payload = _invert_value(
|
|
payload, port.state_address, self._address_options, "outgoing"
|
|
)
|
|
await self._knx_send_raw(port.state_address, payload)
|
|
|
|
return _handler
|
|
|
|
def _cover_changed(self, port: CoverPort) -> callable[[Event], Any]:
|
|
async def _handler(event: Event) -> None:
|
|
new_state: State | None = event.data.get("new_state")
|
|
if new_state is None:
|
|
return
|
|
|
|
if port.position_state_address is not None:
|
|
position = new_state.attributes.get("current_position")
|
|
if position is not None:
|
|
position = _invert_value(
|
|
position,
|
|
port.position_state_address,
|
|
self._address_options,
|
|
"outgoing",
|
|
)
|
|
await self._knx_send_percent(
|
|
port.position_state_address, position
|
|
)
|
|
|
|
if port.angle_state_address is not None:
|
|
angle = new_state.attributes.get("current_tilt_position")
|
|
if angle is not None:
|
|
angle = _invert_value(
|
|
angle,
|
|
port.angle_state_address,
|
|
self._address_options,
|
|
"outgoing",
|
|
)
|
|
await self._knx_send_percent(port.angle_state_address, angle)
|
|
|
|
return _handler
|
|
|
|
async def _handle_knx_event(self, event: Event) -> None:
|
|
if event.data.get("direction") != "Incoming":
|
|
return
|
|
|
|
destination = event.data.get("destination")
|
|
if not destination:
|
|
return
|
|
|
|
handler = self._address_handlers.get(destination)
|
|
if handler is None:
|
|
return
|
|
|
|
await handler(event)
|
|
|
|
async def _handle_cover_action(
|
|
self, port: CoverPort, action: str, event: Event
|
|
) -> None:
|
|
value = _extract_event_value(event)
|
|
if value is None:
|
|
return
|
|
value = _invert_value(
|
|
value, event.data.get("destination"), self._address_options, "incoming"
|
|
)
|
|
|
|
if action == "move_long":
|
|
if value == 0:
|
|
await self._call_cover_service(port.entity_id, "open_cover")
|
|
elif value == 1:
|
|
await self._call_cover_service(port.entity_id, "close_cover")
|
|
elif action == "move_short":
|
|
if value in (0, 1):
|
|
await self._call_cover_service(port.entity_id, "stop_cover")
|
|
elif action == "stop":
|
|
if value in (0, 1):
|
|
await self._call_cover_service(port.entity_id, "stop_cover")
|
|
elif action == "position":
|
|
await self._call_cover_service(
|
|
port.entity_id, "set_cover_position", {"position": value}
|
|
)
|
|
elif action == "angle":
|
|
await self._call_cover_service(
|
|
port.entity_id, "set_cover_tilt_position", {"tilt_position": value}
|
|
)
|
|
|
|
async def _handle_switch_action(self, port: SwitchPort, event: Event) -> None:
|
|
value = _extract_event_value(event)
|
|
if value is None:
|
|
return
|
|
value = _invert_value(
|
|
value, event.data.get("destination"), self._address_options, "incoming"
|
|
)
|
|
if value == 0:
|
|
await self._call_switch_service(port.entity_id, "turn_off")
|
|
elif value == 1:
|
|
await self._call_switch_service(port.entity_id, "turn_on")
|
|
|
|
async def _call_cover_service(
|
|
self, entity_id: str, service: str, service_data: dict[str, Any] | None = None
|
|
) -> None:
|
|
data = {"entity_id": entity_id}
|
|
if service_data:
|
|
data.update(service_data)
|
|
await self.hass.services.async_call(
|
|
"cover", service, data, blocking=False
|
|
)
|
|
|
|
async def _call_switch_service(
|
|
self, entity_id: str, service: str, service_data: dict[str, Any] | None = None
|
|
) -> None:
|
|
data = {"entity_id": entity_id}
|
|
if service_data:
|
|
data.update(service_data)
|
|
await self.hass.services.async_call(
|
|
"switch", service, data, blocking=False
|
|
)
|
|
|
|
async def _knx_send_raw(self, address: str | None, payload: int) -> None:
|
|
if not address:
|
|
return
|
|
await self.hass.services.async_call(
|
|
KNX_DOMAIN,
|
|
"send",
|
|
{"address": address, "payload": payload, "payload_length": 0},
|
|
blocking=False,
|
|
)
|
|
|
|
async def _knx_send_percent(self, address: str | None, value: int) -> None:
|
|
if not address:
|
|
return
|
|
await self.hass.services.async_call(
|
|
KNX_DOMAIN,
|
|
"send",
|
|
{"address": address, "payload": value, "type": "percent"},
|
|
blocking=False,
|
|
)
|
|
|
|
|
|
def _extract_event_value(event: Event) -> int | None:
|
|
if "value" in event.data:
|
|
try:
|
|
return int(event.data["value"])
|
|
except (TypeError, ValueError):
|
|
return None
|
|
data = event.data.get("data")
|
|
if data is None:
|
|
return None
|
|
if isinstance(data, list) and data:
|
|
data = data[0]
|
|
try:
|
|
return int(data) & 1
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _clean_address(address: Any) -> str | None:
|
|
if not address:
|
|
return None
|
|
if isinstance(address, str):
|
|
stripped = address.strip()
|
|
return stripped or None
|
|
return None
|
|
|
|
|
|
def _clean_bool(value: Any) -> bool:
|
|
return bool(value)
|
|
|
|
|
|
def _invert_value(
|
|
value: int,
|
|
address: str | None,
|
|
address_options: dict[str, AddressOptions],
|
|
direction: str,
|
|
) -> int:
|
|
if address is None:
|
|
return value
|
|
options = address_options.get(address)
|
|
if options is None:
|
|
return value
|
|
if direction == "incoming":
|
|
if not options.invert_incoming:
|
|
return value
|
|
else:
|
|
if not options.invert_outgoing:
|
|
return value
|
|
if options.value_type == "percent":
|
|
return 100 - value
|
|
if value not in (0, 1):
|
|
return value
|
|
return 0 if value == 1 else 1
|
|
|
|
|
|
def _invert_in_key(address_key: str) -> str:
|
|
return f"{address_key}_{CONF_INVERT_INCOMING}"
|
|
|
|
|
|
def _invert_out_key(address_key: str) -> str:
|
|
return f"{address_key}_{CONF_INVERT_OUTGOING}"
|