mirror of
https://github.com/bahmcloud/HA-KNX-Bridge.git
synced 2026-04-06 18:01:14 +00:00
234 lines
7.5 KiB
Python
234 lines
7.5 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
import voluptuous as vol
|
|
|
|
from homeassistant import config_entries
|
|
from homeassistant.const import CONF_ENTITY_ID
|
|
from homeassistant.core import callback
|
|
from homeassistant.helpers import selector
|
|
|
|
from .const import (
|
|
CONF_ANGLE_ADDRESS,
|
|
CONF_ANGLE_STATE_ADDRESS,
|
|
CONF_KNX_ENTRY_ID,
|
|
CONF_MOVE_LONG_ADDRESS,
|
|
CONF_MOVE_SHORT_ADDRESS,
|
|
CONF_POSITION_ADDRESS,
|
|
CONF_POSITION_STATE_ADDRESS,
|
|
CONF_STATE_ADDRESS,
|
|
CONF_STOP_ADDRESS,
|
|
DOMAIN,
|
|
)
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
KNX_DOMAIN = "knx"
|
|
|
|
|
|
class HAKnxBridgeConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
|
VERSION = 1
|
|
|
|
async def async_step_user(self, user_input: dict | None = None):
|
|
knx_entries = self.hass.config_entries.async_entries(KNX_DOMAIN)
|
|
if not knx_entries:
|
|
return self.async_abort(reason="knx_not_configured")
|
|
|
|
if user_input is not None:
|
|
return self.async_create_entry(
|
|
title="HA KNX Bridge",
|
|
data={CONF_KNX_ENTRY_ID: user_input[CONF_KNX_ENTRY_ID]},
|
|
)
|
|
|
|
options = [
|
|
{"value": entry.entry_id, "label": entry.title or entry.entry_id}
|
|
for entry in knx_entries
|
|
]
|
|
schema = vol.Schema(
|
|
{
|
|
vol.Required(CONF_KNX_ENTRY_ID): selector.SelectSelector(
|
|
selector.SelectSelectorConfig(options=options, mode="dropdown")
|
|
)
|
|
}
|
|
)
|
|
return self.async_show_form(step_id="user", data_schema=schema)
|
|
|
|
@staticmethod
|
|
@callback
|
|
def async_get_options_flow(config_entry):
|
|
return HAKnxBridgeOptionsFlow(config_entry)
|
|
|
|
@staticmethod
|
|
@callback
|
|
def async_get_supported_subentry_types(config_entry):
|
|
return {
|
|
"binary_sensor": config_entries.SubentryType(
|
|
name="Binary Sensor Port", flow_class=BinarySensorPortSubentryFlow
|
|
),
|
|
"cover": config_entries.SubentryType(
|
|
name="Cover Port", flow_class=CoverPortSubentryFlow
|
|
),
|
|
}
|
|
|
|
|
|
class HAKnxBridgeOptionsFlow(config_entries.OptionsFlow):
|
|
def __init__(self, config_entry: config_entries.ConfigEntry) -> None:
|
|
self._config_entry = config_entry
|
|
|
|
async def async_step_init(self, user_input: dict | None = None):
|
|
if user_input is not None:
|
|
return self.async_create_entry(title="", data=user_input)
|
|
|
|
schema = vol.Schema({})
|
|
return self.async_show_form(step_id="init", data_schema=schema)
|
|
|
|
|
|
class BinarySensorPortSubentryFlow(config_entries.ConfigSubentryFlow):
|
|
VERSION = 1
|
|
|
|
async def async_step_user(self, user_input: dict | None = None):
|
|
if user_input is not None:
|
|
user_input, errors = _validate_knx_addresses(
|
|
user_input, [CONF_STATE_ADDRESS]
|
|
)
|
|
if errors:
|
|
return self.async_show_form(
|
|
step_id="user", data_schema=_binary_sensor_schema(), errors=errors
|
|
)
|
|
title = _entity_title(self.hass, user_input[CONF_ENTITY_ID])
|
|
return self.async_create_entry(title=title, data=user_input)
|
|
|
|
return self.async_show_form(step_id="user", data_schema=_binary_sensor_schema())
|
|
|
|
|
|
class CoverPortSubentryFlow(config_entries.ConfigSubentryFlow):
|
|
VERSION = 1
|
|
|
|
async def async_step_user(self, user_input: dict | None = None):
|
|
if user_input is not None:
|
|
user_input, errors = _validate_knx_addresses(
|
|
user_input,
|
|
[
|
|
CONF_MOVE_LONG_ADDRESS,
|
|
CONF_MOVE_SHORT_ADDRESS,
|
|
CONF_STOP_ADDRESS,
|
|
CONF_POSITION_ADDRESS,
|
|
CONF_POSITION_STATE_ADDRESS,
|
|
CONF_ANGLE_ADDRESS,
|
|
CONF_ANGLE_STATE_ADDRESS,
|
|
],
|
|
)
|
|
if errors:
|
|
return self.async_show_form(
|
|
step_id="user", data_schema=_cover_schema(), errors=errors
|
|
)
|
|
title = _entity_title(self.hass, user_input[CONF_ENTITY_ID])
|
|
return self.async_create_entry(title=title, data=user_input)
|
|
|
|
return self.async_show_form(step_id="user", data_schema=_cover_schema())
|
|
|
|
|
|
def _entity_title(hass, entity_id: str) -> str:
|
|
state = hass.states.get(entity_id)
|
|
if state is None:
|
|
return entity_id
|
|
return state.attributes.get("friendly_name", entity_id)
|
|
|
|
|
|
def _binary_sensor_schema() -> vol.Schema:
|
|
return vol.Schema(
|
|
{
|
|
vol.Required(CONF_ENTITY_ID): selector.EntitySelector(
|
|
selector.EntitySelectorConfig(domain=["binary_sensor"])
|
|
),
|
|
vol.Optional(CONF_STATE_ADDRESS): selector.TextSelector(
|
|
selector.TextSelectorConfig(type="text")
|
|
),
|
|
}
|
|
)
|
|
|
|
|
|
def _cover_schema() -> vol.Schema:
|
|
return vol.Schema(
|
|
{
|
|
vol.Required(CONF_ENTITY_ID): selector.EntitySelector(
|
|
selector.EntitySelectorConfig(domain=["cover"])
|
|
),
|
|
vol.Optional(CONF_MOVE_LONG_ADDRESS): selector.TextSelector(
|
|
selector.TextSelectorConfig(type="text")
|
|
),
|
|
vol.Optional(CONF_MOVE_SHORT_ADDRESS): selector.TextSelector(
|
|
selector.TextSelectorConfig(type="text")
|
|
),
|
|
vol.Optional(CONF_STOP_ADDRESS): selector.TextSelector(
|
|
selector.TextSelectorConfig(type="text")
|
|
),
|
|
vol.Optional(CONF_POSITION_ADDRESS): selector.TextSelector(
|
|
selector.TextSelectorConfig(type="text")
|
|
),
|
|
vol.Optional(CONF_POSITION_STATE_ADDRESS): selector.TextSelector(
|
|
selector.TextSelectorConfig(type="text")
|
|
),
|
|
vol.Optional(CONF_ANGLE_ADDRESS): selector.TextSelector(
|
|
selector.TextSelectorConfig(type="text")
|
|
),
|
|
vol.Optional(CONF_ANGLE_STATE_ADDRESS): selector.TextSelector(
|
|
selector.TextSelectorConfig(type="text")
|
|
),
|
|
}
|
|
)
|
|
|
|
|
|
def _validate_knx_addresses(
|
|
user_input: dict, keys: list[str]
|
|
) -> tuple[dict, dict[str, str]]:
|
|
errors: dict[str, str] = {}
|
|
cleaned = dict(user_input)
|
|
for key in keys:
|
|
if key not in cleaned:
|
|
continue
|
|
value = cleaned.get(key)
|
|
if value is None:
|
|
cleaned.pop(key, None)
|
|
continue
|
|
try:
|
|
normalized = _normalize_group_address(str(value))
|
|
except ValueError:
|
|
errors[key] = "invalid_ga"
|
|
continue
|
|
if normalized == "":
|
|
cleaned.pop(key, None)
|
|
else:
|
|
cleaned[key] = normalized
|
|
return cleaned, errors
|
|
|
|
|
|
def _normalize_group_address(value: str) -> str:
|
|
text = value.strip()
|
|
if not text:
|
|
return ""
|
|
parts = text.split("/")
|
|
if len(parts) == 2:
|
|
main, sub = _parse_int(parts[0]), _parse_int(parts[1])
|
|
if not (0 <= main <= 31 and 0 <= sub <= 2047):
|
|
raise ValueError("group address out of range")
|
|
return f"{main}/{sub}"
|
|
if len(parts) == 3:
|
|
main, middle, sub = (
|
|
_parse_int(parts[0]),
|
|
_parse_int(parts[1]),
|
|
_parse_int(parts[2]),
|
|
)
|
|
if not (0 <= main <= 31 and 0 <= middle <= 7 and 0 <= sub <= 255):
|
|
raise ValueError("group address out of range")
|
|
return f"{main}/{middle}/{sub}"
|
|
raise ValueError("invalid group address format")
|
|
|
|
|
|
def _parse_int(value: str) -> int:
|
|
text = value.strip()
|
|
if text == "":
|
|
raise ValueError("empty group address part")
|
|
return int(text)
|