Initial HA KNX Bridge scaffold

This commit is contained in:
2026-02-13 09:26:56 +01:00
commit 297d318e15
9 changed files with 605 additions and 0 deletions

View File

@@ -0,0 +1,31 @@
from __future__ import annotations
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from .bridge import BridgeManager
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
manager = BridgeManager(hass, entry)
try:
await manager.async_setup()
except ConfigEntryNotReady:
await manager.async_unload()
raise
entry.runtime_data = manager
entry.async_on_unload(entry.add_update_listener(_update_listener))
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
manager: BridgeManager | None = getattr(entry, "runtime_data", None)
if manager is None:
return True
await manager.async_unload()
return True
async def _update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
await hass.config_entries.async_reload(entry.entry_id)

View File

@@ -0,0 +1,307 @@
from __future__ import annotations
from dataclasses import dataclass
import logging
from typing import Any
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import Event, HomeAssistant, State
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import event as event_helper
from .const import (
CONF_ANGLE_ADDRESS,
CONF_ANGLE_STATE_ADDRESS,
CONF_MOVE_LONG_ADDRESS,
CONF_MOVE_SHORT_ADDRESS,
CONF_POSITION_ADDRESS,
CONF_POSITION_STATE_ADDRESS,
CONF_STATE_ADDRESS,
CONF_STOP_ADDRESS,
)
_LOGGER = logging.getLogger(__name__)
KNX_DOMAIN = "knx"
@dataclass(frozen=True)
class BinarySensorPort:
entity_id: str
state_address: str | None
@dataclass(frozen=True)
class CoverPort:
entity_id: str
move_long_address: str | None
move_short_address: str | None
stop_address: str | None
position_address: str | None
position_state_address: str | None
angle_address: str | None
angle_state_address: str | None
class BridgeManager:
def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
self.hass = hass
self.entry = entry
self._unsub_listeners: list[callable] = []
self._knx_event_unsub: callable | None = None
self._address_handlers: dict[str, callable[[Event], Any]] = {}
self._registered_addresses: list[tuple[str, str | None]] = []
async def async_setup(self) -> None:
if not self.hass.services.has_service(KNX_DOMAIN, "send"):
raise ConfigEntryNotReady("KNX integration services not available")
binary_ports, cover_ports = self._load_ports()
self._register_outgoing(binary_ports, cover_ports)
await self._register_incoming(cover_ports)
async def async_unload(self) -> None:
for unsub in self._unsub_listeners:
unsub()
self._unsub_listeners.clear()
if self._knx_event_unsub is not None:
self._knx_event_unsub()
self._knx_event_unsub = None
await self._unregister_knx_events()
def _load_ports(self) -> tuple[list[BinarySensorPort], list[CoverPort]]:
binary_ports: list[BinarySensorPort] = []
cover_ports: list[CoverPort] = []
subentries = getattr(self.entry, "subentries", [])
for subentry in subentries:
data = subentry.data
if subentry.type == "binary_sensor":
binary_ports.append(
BinarySensorPort(
entity_id=data["entity_id"],
state_address=_clean_address(data.get(CONF_STATE_ADDRESS)),
)
)
elif subentry.type == "cover":
cover_ports.append(
CoverPort(
entity_id=data["entity_id"],
move_long_address=_clean_address(data.get(CONF_MOVE_LONG_ADDRESS)),
move_short_address=_clean_address(data.get(CONF_MOVE_SHORT_ADDRESS)),
stop_address=_clean_address(data.get(CONF_STOP_ADDRESS)),
position_address=_clean_address(data.get(CONF_POSITION_ADDRESS)),
position_state_address=_clean_address(
data.get(CONF_POSITION_STATE_ADDRESS)
),
angle_address=_clean_address(data.get(CONF_ANGLE_ADDRESS)),
angle_state_address=_clean_address(
data.get(CONF_ANGLE_STATE_ADDRESS)
),
)
)
return binary_ports, cover_ports
def _register_outgoing(
self, binary_ports: list[BinarySensorPort], cover_ports: list[CoverPort]
) -> None:
for port in binary_ports:
if not port.state_address:
continue
self._unsub_listeners.append(
event_helper.async_track_state_change_event(
self.hass, [port.entity_id], self._binary_sensor_changed(port)
)
)
for port in cover_ports:
if not (port.position_state_address or port.angle_state_address):
continue
self._unsub_listeners.append(
event_helper.async_track_state_change_event(
self.hass, [port.entity_id], self._cover_changed(port)
)
)
async def _register_incoming(self, cover_ports: list[CoverPort]) -> None:
for port in cover_ports:
self._register_knx_address(port.move_long_address, None, port, "move_long")
self._register_knx_address(port.move_short_address, None, port, "move_short")
self._register_knx_address(port.stop_address, None, port, "stop")
self._register_knx_address(
port.position_address, "percent", port, "position"
)
self._register_knx_address(
port.angle_address, "percent", port, "angle"
)
if self._address_handlers:
self._knx_event_unsub = event_helper.async_track_event(
self.hass, "knx_event", self._handle_knx_event
)
await self._register_knx_events()
def _register_knx_address(
self,
address: str | None,
value_type: str | None,
port: CoverPort,
action: str,
) -> None:
if not address:
return
self._address_handlers[address] = lambda event: self._handle_cover_action(
port, action, event
)
self._registered_addresses.append((address, value_type))
async def _register_knx_events(self) -> None:
for address, value_type in self._registered_addresses:
data: dict[str, Any] = {"address": address}
if value_type:
data["type"] = value_type
await self.hass.services.async_call(
KNX_DOMAIN, "event_register", data, blocking=False
)
async def _unregister_knx_events(self) -> None:
for address, value_type in self._registered_addresses:
data: dict[str, Any] = {"address": address, "remove": True}
if value_type:
data["type"] = value_type
await self.hass.services.async_call(
KNX_DOMAIN, "event_register", data, blocking=False
)
self._registered_addresses.clear()
def _binary_sensor_changed(self, port: BinarySensorPort) -> callable[[Event], Any]:
async def _handler(event: Event) -> None:
new_state: State | None = event.data.get("new_state")
if new_state is None:
return
if new_state.state not in ("on", "off"):
return
payload = 1 if new_state.state == "on" else 0
await self._knx_send_raw(port.state_address, payload)
return _handler
def _cover_changed(self, port: CoverPort) -> callable[[Event], Any]:
async def _handler(event: Event) -> None:
new_state: State | None = event.data.get("new_state")
if new_state is None:
return
if port.position_state_address is not None:
position = new_state.attributes.get("current_position")
if position is not None:
await self._knx_send_percent(port.position_state_address, position)
if port.angle_state_address is not None:
angle = new_state.attributes.get("current_tilt_position")
if angle is not None:
await self._knx_send_percent(port.angle_state_address, angle)
return _handler
async def _handle_knx_event(self, event: Event) -> None:
if event.data.get("direction") != "Incoming":
return
destination = event.data.get("destination")
if not destination:
return
handler = self._address_handlers.get(destination)
if handler is None:
return
await handler(event)
async def _handle_cover_action(
self, port: CoverPort, action: str, event: Event
) -> None:
value = _extract_event_value(event)
if value is None:
return
if action == "move_long":
if value == 0:
await self._call_cover_service(port.entity_id, "open_cover")
elif value == 1:
await self._call_cover_service(port.entity_id, "close_cover")
elif action == "move_short":
if value in (0, 1):
await self._call_cover_service(port.entity_id, "stop_cover")
elif action == "stop":
if value in (0, 1):
await self._call_cover_service(port.entity_id, "stop_cover")
elif action == "position":
await self._call_cover_service(
port.entity_id, "set_cover_position", {"position": value}
)
elif action == "angle":
await self._call_cover_service(
port.entity_id, "set_cover_tilt_position", {"tilt_position": value}
)
async def _call_cover_service(
self, entity_id: str, service: str, service_data: dict[str, Any] | None = None
) -> None:
data = {"entity_id": entity_id}
if service_data:
data.update(service_data)
await self.hass.services.async_call(
"cover", service, data, blocking=False
)
async def _knx_send_raw(self, address: str | None, payload: int) -> None:
if not address:
return
await self.hass.services.async_call(
KNX_DOMAIN,
"send",
{"address": address, "payload": payload, "payload_length": 0},
blocking=False,
)
async def _knx_send_percent(self, address: str | None, value: int) -> None:
if not address:
return
await self.hass.services.async_call(
KNX_DOMAIN,
"send",
{"address": address, "payload": value, "type": "percent"},
blocking=False,
)
def _extract_event_value(event: Event) -> int | None:
if "value" in event.data:
try:
return int(event.data["value"])
except (TypeError, ValueError):
return None
data = event.data.get("data")
if data is None:
return None
if isinstance(data, list) and data:
data = data[0]
try:
return int(data) & 1
except (TypeError, ValueError):
return None
def _clean_address(address: Any) -> str | None:
if not address:
return None
if isinstance(address, str):
stripped = address.strip()
return stripped or None
return None

View File

@@ -0,0 +1,134 @@
from __future__ import annotations
import logging
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import CONF_ENTITY_ID
from homeassistant.core import callback
from homeassistant.helpers import selector
from .const import (
CONF_ANGLE_ADDRESS,
CONF_ANGLE_STATE_ADDRESS,
CONF_KNX_ENTRY_ID,
CONF_MOVE_LONG_ADDRESS,
CONF_MOVE_SHORT_ADDRESS,
CONF_POSITION_ADDRESS,
CONF_POSITION_STATE_ADDRESS,
CONF_STATE_ADDRESS,
CONF_STOP_ADDRESS,
DOMAIN,
)
_LOGGER = logging.getLogger(__name__)
KNX_DOMAIN = "knx"
class HAKnxBridgeConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
VERSION = 1
async def async_step_user(self, user_input: dict | None = None):
knx_entries = self.hass.config_entries.async_entries(KNX_DOMAIN)
if not knx_entries:
return self.async_abort(reason="knx_not_configured")
if user_input is not None:
return self.async_create_entry(
title="HA KNX Bridge",
data={CONF_KNX_ENTRY_ID: user_input[CONF_KNX_ENTRY_ID]},
)
options = [
{"value": entry.entry_id, "label": entry.title or entry.entry_id}
for entry in knx_entries
]
schema = vol.Schema(
{
vol.Required(CONF_KNX_ENTRY_ID): selector.SelectSelector(
selector.SelectSelectorConfig(options=options, mode="dropdown")
)
}
)
return self.async_show_form(step_id="user", data_schema=schema)
@staticmethod
@callback
def async_get_supported_subentry_types(config_entry):
return {
"binary_sensor": config_entries.SubentryType(
name="Binary Sensor Port", flow_class=BinarySensorPortSubentryFlow
),
"cover": config_entries.SubentryType(
name="Cover Port", flow_class=CoverPortSubentryFlow
),
}
class BinarySensorPortSubentryFlow(config_entries.ConfigSubentryFlow):
VERSION = 1
async def async_step_user(self, user_input: dict | None = None):
if user_input is not None:
title = _entity_title(self.hass, user_input[CONF_ENTITY_ID])
return self.async_create_entry(title=title, data=user_input)
schema = vol.Schema(
{
vol.Required(CONF_ENTITY_ID): selector.EntitySelector(
selector.EntitySelectorConfig(domain=["binary_sensor"])
),
vol.Optional(CONF_STATE_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
}
)
return self.async_show_form(step_id="user", data_schema=schema)
class CoverPortSubentryFlow(config_entries.ConfigSubentryFlow):
VERSION = 1
async def async_step_user(self, user_input: dict | None = None):
if user_input is not None:
title = _entity_title(self.hass, user_input[CONF_ENTITY_ID])
return self.async_create_entry(title=title, data=user_input)
schema = vol.Schema(
{
vol.Required(CONF_ENTITY_ID): selector.EntitySelector(
selector.EntitySelectorConfig(domain=["cover"])
),
vol.Optional(CONF_MOVE_LONG_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(CONF_MOVE_SHORT_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(CONF_STOP_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(CONF_POSITION_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(CONF_POSITION_STATE_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(CONF_ANGLE_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
vol.Optional(CONF_ANGLE_STATE_ADDRESS): selector.TextSelector(
selector.TextSelectorConfig(type="text")
),
}
)
return self.async_show_form(step_id="user", data_schema=schema)
def _entity_title(hass, entity_id: str) -> str:
state = hass.states.get(entity_id)
if state is None:
return entity_id
return state.attributes.get("friendly_name", entity_id)

View File

@@ -0,0 +1,13 @@
DOMAIN = "ha_knx_bridge"
CONF_KNX_ENTRY_ID = "knx_entry_id"
CONF_STATE_ADDRESS = "state_address"
CONF_MOVE_LONG_ADDRESS = "move_long_address"
CONF_MOVE_SHORT_ADDRESS = "move_short_address"
CONF_STOP_ADDRESS = "stop_address"
CONF_POSITION_ADDRESS = "position_address"
CONF_POSITION_STATE_ADDRESS = "position_state_address"
CONF_ANGLE_ADDRESS = "angle_address"
CONF_ANGLE_STATE_ADDRESS = "angle_state_address"

View File

@@ -0,0 +1,13 @@
{
"domain": "ha_knx_bridge",
"name": "HA KNX Bridge",
"version": "0.1.0",
"config_flow": true,
"documentation": "https://github.com/OWNER/REPO",
"issue_tracker": "https://github.com/OWNER/REPO/issues",
"codeowners": ["@your-github-handle"],
"dependencies": ["knx"],
"after_dependencies": ["knx"],
"homeassistant": "2025.12.0",
"iot_class": "local_push"
}

View File

@@ -0,0 +1,46 @@
{
"config": {
"abort": {
"knx_not_configured": "Set up the Home Assistant KNX integration first."
},
"step": {
"user": {
"title": "HA KNX Bridge",
"description": "Select the Home Assistant KNX integration entry to use.",
"data": {
"knx_entry_id": "KNX integration"
}
}
}
},
"config_subentries": {
"binary_sensor": {
"step": {
"user": {
"title": "Binary Sensor Port",
"data": {
"entity_id": "Binary sensor entity",
"state_address": "State group address (DPT 1)"
}
}
}
},
"cover": {
"step": {
"user": {
"title": "Cover Port",
"data": {
"entity_id": "Cover entity",
"move_long_address": "Move long (DPT 1.008 Up/Down)",
"move_short_address": "Move short (DPT 1.007 Step)",
"stop_address": "Stop (DPT 1)",
"position_address": "Set position (DPT 5.001)",
"position_state_address": "State position (DPT 5.001)",
"angle_address": "Set tilt (DPT 5.001)",
"angle_state_address": "State tilt (DPT 5.001)"
}
}
}
}
}
}