Improve KNX cover command decoding

This commit is contained in:
2026-02-13 20:23:01 +01:00
parent 0fa390fd7f
commit 22afc5addf
5 changed files with 73 additions and 16 deletions

View File

@@ -12,6 +12,7 @@ from homeassistant.helpers import event as event_helper
from .const import (
CONF_ANGLE_ADDRESS,
CONF_ANGLE_STATE_ADDRESS,
ADDRESS_EVENT_TYPE,
ADDRESS_VALUE_TYPE,
CONF_COMMAND_ADDRESS,
CONF_INVERT_INCOMING,
@@ -356,10 +357,11 @@ class BridgeManager:
port, action, event
)
value_type = _get_value_type(address_key)
event_type = _get_event_type(address_key)
self._remember_address_options(
address, value_type, invert_incoming, invert_outgoing
)
self._registered_addresses.append((address, value_type))
self._registered_addresses.append((address, event_type))
def _register_knx_switch_address(
self,
@@ -379,7 +381,7 @@ class BridgeManager:
invert_incoming,
invert_outgoing,
)
self._registered_addresses.append((address, None))
self._registered_addresses.append((address, _get_event_type(CONF_COMMAND_ADDRESS)))
def _remember_address_options(
self,
@@ -395,19 +397,19 @@ class BridgeManager:
)
async def _register_knx_events(self) -> None:
for address, value_type in self._registered_addresses:
for address, event_type in self._registered_addresses:
data: dict[str, Any] = {"address": address}
if value_type:
data["type"] = value_type
if event_type:
data["type"] = event_type
await self.hass.services.async_call(
KNX_DOMAIN, "event_register", data, blocking=False
)
async def _unregister_knx_events(self) -> None:
for address, value_type in self._registered_addresses:
for address, event_type in self._registered_addresses:
data: dict[str, Any] = {"address": address, "remove": True}
if value_type:
data["type"] = value_type
if event_type:
data["type"] = event_type
await self.hass.services.async_call(
KNX_DOMAIN, "event_register", data, blocking=False
)
@@ -476,10 +478,15 @@ class BridgeManager:
return _handler
async def _handle_knx_event(self, event: Event) -> None:
if event.data.get("direction") != "Incoming":
direction = event.data.get("direction")
if direction is not None and not str(direction).lower().startswith("incoming"):
return
destination = event.data.get("destination")
destination = (
event.data.get("destination")
or event.data.get("destination_address")
or event.data.get("address")
)
if not destination:
return
@@ -564,6 +571,7 @@ class BridgeManager:
async def _knx_send_percent(self, address: str | None, value: int) -> None:
if not address:
return
value = _clamp_percent(value)
await self.hass.services.async_call(
KNX_DOMAIN,
"send",
@@ -574,15 +582,18 @@ class BridgeManager:
def _extract_event_value(event: Event) -> int | None:
if "value" in event.data:
try:
return int(event.data["value"])
except (TypeError, ValueError):
return None
value = event.data["value"]
mapped = _map_scalar_value(value)
if mapped is not None:
return mapped
data = event.data.get("data")
if data is None:
return None
if isinstance(data, list) and data:
data = data[0]
mapped = _map_scalar_value(data)
if mapped is not None:
return mapped
try:
return int(data) & 1
except (TypeError, ValueError):
@@ -620,6 +631,7 @@ def _invert_value(
if not options.invert_outgoing:
return value
if options.value_type == "percent":
value = _clamp_percent(value)
return 100 - value
if value not in (0, 1):
return value
@@ -638,6 +650,36 @@ def _get_value_type(address_key: str) -> str | None:
return ADDRESS_VALUE_TYPE.get(address_key)
def _get_event_type(address_key: str) -> str | None:
return ADDRESS_EVENT_TYPE.get(address_key)
def _clamp_percent(value: int) -> int:
if value < 0:
return 0
if value > 100:
return 100
return value
def _map_scalar_value(value: Any) -> int | None:
if isinstance(value, bool):
return 1 if value else 0
if isinstance(value, (int, float)):
return int(value)
if isinstance(value, str):
text = value.strip().lower()
if text in ("on", "true", "yes", "1", "down", "close", "closed"):
return 1
if text in ("off", "false", "no", "0", "up", "open", "opened"):
return 0
try:
return int(text)
except ValueError:
return None
return None
def _iter_port_configs(entry: ConfigEntry) -> list[tuple[str, dict[str, Any]]]:
ports: list[tuple[str, dict[str, Any]]] = []
subentries = getattr(entry, "subentries", [])

View File

@@ -35,3 +35,12 @@ ADDRESS_VALUE_TYPE: dict[str, str] = {
CONF_ANGLE_ADDRESS: "percent",
CONF_ANGLE_STATE_ADDRESS: "percent",
}
ADDRESS_EVENT_TYPE: dict[str, str] = {
CONF_MOVE_LONG_ADDRESS: "up_down",
CONF_MOVE_SHORT_ADDRESS: "step",
CONF_POSITION_ADDRESS: "percent",
CONF_POSITION_STATE_ADDRESS: "percent",
CONF_ANGLE_ADDRESS: "percent",
CONF_ANGLE_STATE_ADDRESS: "percent",
}

View File

@@ -1,7 +1,7 @@
{
"domain": "ha_knx_bridge",
"name": "HA KNX Bridge",
"version": "0.0.12",
"version": "0.0.14",
"config_flow": true,
"documentation": "https://github.com/bahmcloud/HA-KNX-Bridge",
"issue_tracker": "https://github.com/bahmcloud/HA-KNX-Bridge/issues",