mirror of
https://github.com/bahmcloud/HA-KNX-Bridge.git
synced 2026-04-06 16:51:14 +00:00
Add switch port support
This commit is contained in:
@@ -12,6 +12,7 @@ from homeassistant.helpers import event as event_helper
|
||||
from .const import (
|
||||
CONF_ANGLE_ADDRESS,
|
||||
CONF_ANGLE_STATE_ADDRESS,
|
||||
CONF_COMMAND_ADDRESS,
|
||||
CONF_MOVE_LONG_ADDRESS,
|
||||
CONF_MOVE_SHORT_ADDRESS,
|
||||
CONF_POSITION_ADDRESS,
|
||||
@@ -43,6 +44,13 @@ class CoverPort:
|
||||
angle_state_address: str | None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SwitchPort:
|
||||
entity_id: str
|
||||
command_address: str | None
|
||||
state_address: str | None
|
||||
|
||||
|
||||
class BridgeManager:
|
||||
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
|
||||
self.hass = hass
|
||||
@@ -56,9 +64,9 @@ class BridgeManager:
|
||||
if not self.hass.services.has_service(KNX_DOMAIN, "send"):
|
||||
raise ConfigEntryNotReady("KNX integration services not available")
|
||||
|
||||
binary_ports, cover_ports = self._load_ports()
|
||||
self._register_outgoing(binary_ports, cover_ports)
|
||||
await self._register_incoming(cover_ports)
|
||||
binary_ports, switch_ports, cover_ports = self._load_ports()
|
||||
self._register_outgoing(binary_ports, switch_ports, cover_ports)
|
||||
await self._register_incoming(switch_ports, cover_ports)
|
||||
|
||||
async def async_unload(self) -> None:
|
||||
for unsub in self._unsub_listeners:
|
||||
@@ -71,8 +79,11 @@ class BridgeManager:
|
||||
|
||||
await self._unregister_knx_events()
|
||||
|
||||
def _load_ports(self) -> tuple[list[BinarySensorPort], list[CoverPort]]:
|
||||
def _load_ports(
|
||||
self,
|
||||
) -> tuple[list[BinarySensorPort], list[SwitchPort], list[CoverPort]]:
|
||||
binary_ports: list[BinarySensorPort] = []
|
||||
switch_ports: list[SwitchPort] = []
|
||||
cover_ports: list[CoverPort] = []
|
||||
|
||||
subentries = getattr(self.entry, "subentries", [])
|
||||
@@ -85,6 +96,16 @@ class BridgeManager:
|
||||
state_address=_clean_address(data.get(CONF_STATE_ADDRESS)),
|
||||
)
|
||||
)
|
||||
elif subentry.type == "switch":
|
||||
switch_ports.append(
|
||||
SwitchPort(
|
||||
entity_id=data["entity_id"],
|
||||
command_address=_clean_address(
|
||||
data.get(CONF_COMMAND_ADDRESS)
|
||||
),
|
||||
state_address=_clean_address(data.get(CONF_STATE_ADDRESS)),
|
||||
)
|
||||
)
|
||||
elif subentry.type == "cover":
|
||||
cover_ports.append(
|
||||
CoverPort(
|
||||
@@ -103,10 +124,13 @@ class BridgeManager:
|
||||
)
|
||||
)
|
||||
|
||||
return binary_ports, cover_ports
|
||||
return binary_ports, switch_ports, cover_ports
|
||||
|
||||
def _register_outgoing(
|
||||
self, binary_ports: list[BinarySensorPort], cover_ports: list[CoverPort]
|
||||
self,
|
||||
binary_ports: list[BinarySensorPort],
|
||||
switch_ports: list[SwitchPort],
|
||||
cover_ports: list[CoverPort],
|
||||
) -> None:
|
||||
for port in binary_ports:
|
||||
if not port.state_address:
|
||||
@@ -117,6 +141,15 @@ class BridgeManager:
|
||||
)
|
||||
)
|
||||
|
||||
for port in switch_ports:
|
||||
if not port.state_address:
|
||||
continue
|
||||
self._unsub_listeners.append(
|
||||
event_helper.async_track_state_change_event(
|
||||
self.hass, [port.entity_id], self._switch_changed(port)
|
||||
)
|
||||
)
|
||||
|
||||
for port in cover_ports:
|
||||
if not (port.position_state_address or port.angle_state_address):
|
||||
continue
|
||||
@@ -126,7 +159,12 @@ class BridgeManager:
|
||||
)
|
||||
)
|
||||
|
||||
async def _register_incoming(self, cover_ports: list[CoverPort]) -> None:
|
||||
async def _register_incoming(
|
||||
self, switch_ports: list[SwitchPort], cover_ports: list[CoverPort]
|
||||
) -> None:
|
||||
for port in switch_ports:
|
||||
self._register_knx_switch_address(port.command_address, port)
|
||||
|
||||
for port in cover_ports:
|
||||
self._register_knx_address(port.move_long_address, None, port, "move_long")
|
||||
self._register_knx_address(port.move_short_address, None, port, "move_short")
|
||||
@@ -160,6 +198,18 @@ class BridgeManager:
|
||||
)
|
||||
self._registered_addresses.append((address, value_type))
|
||||
|
||||
def _register_knx_switch_address(
|
||||
self,
|
||||
address: str | None,
|
||||
port: SwitchPort,
|
||||
) -> None:
|
||||
if not address:
|
||||
return
|
||||
self._address_handlers[address] = lambda event: self._handle_switch_action(
|
||||
port, event
|
||||
)
|
||||
self._registered_addresses.append((address, None))
|
||||
|
||||
async def _register_knx_events(self) -> None:
|
||||
for address, value_type in self._registered_addresses:
|
||||
data: dict[str, Any] = {"address": address}
|
||||
@@ -191,6 +241,18 @@ class BridgeManager:
|
||||
|
||||
return _handler
|
||||
|
||||
def _switch_changed(self, port: SwitchPort) -> callable[[Event], Any]:
|
||||
async def _handler(event: Event) -> None:
|
||||
new_state: State | None = event.data.get("new_state")
|
||||
if new_state is None:
|
||||
return
|
||||
if new_state.state not in ("on", "off"):
|
||||
return
|
||||
payload = 1 if new_state.state == "on" else 0
|
||||
await self._knx_send_raw(port.state_address, payload)
|
||||
|
||||
return _handler
|
||||
|
||||
def _cover_changed(self, port: CoverPort) -> callable[[Event], Any]:
|
||||
async def _handler(event: Event) -> None:
|
||||
new_state: State | None = event.data.get("new_state")
|
||||
@@ -250,6 +312,15 @@ class BridgeManager:
|
||||
port.entity_id, "set_cover_tilt_position", {"tilt_position": value}
|
||||
)
|
||||
|
||||
async def _handle_switch_action(self, port: SwitchPort, event: Event) -> None:
|
||||
value = _extract_event_value(event)
|
||||
if value is None:
|
||||
return
|
||||
if value == 0:
|
||||
await self._call_switch_service(port.entity_id, "turn_off")
|
||||
elif value == 1:
|
||||
await self._call_switch_service(port.entity_id, "turn_on")
|
||||
|
||||
async def _call_cover_service(
|
||||
self, entity_id: str, service: str, service_data: dict[str, Any] | None = None
|
||||
) -> None:
|
||||
@@ -260,6 +331,16 @@ class BridgeManager:
|
||||
"cover", service, data, blocking=False
|
||||
)
|
||||
|
||||
async def _call_switch_service(
|
||||
self, entity_id: str, service: str, service_data: dict[str, Any] | None = None
|
||||
) -> None:
|
||||
data = {"entity_id": entity_id}
|
||||
if service_data:
|
||||
data.update(service_data)
|
||||
await self.hass.services.async_call(
|
||||
"switch", service, data, blocking=False
|
||||
)
|
||||
|
||||
async def _knx_send_raw(self, address: str | None, payload: int) -> None:
|
||||
if not address:
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user