diff --git a/bellows/ezsp/v13/commands.py b/bellows/ezsp/v13/commands.py index f87664ae..a67e129c 100644 --- a/bellows/ezsp/v13/commands.py +++ b/bellows/ezsp/v13/commands.py @@ -90,6 +90,28 @@ "status": t.sl_Status, }, ), + # status is kept as uint8_t (not sl_GpStatus) to accept values + # outside the strict 0x00..0x07 range that NCPs return for frames + # with no matching proxy/sink entry. + "gpepIncomingMessageHandler": ( + 0x00C5, + {}, + { + "status": t.uint8_t, + "gpdLink": t.uint8_t, + "sequenceNumber": t.uint8_t, + "addr": t.EmberGpAddress, + "gpdfSecurityLevel": t.EmberGpSecurityLevel, + "gpdfSecurityKeyType": t.EmberGpKeyType, + "autoCommissioning": t.Bool, + "bidirectionalInfo": t.uint8_t, + "gpdSecurityFrameCounter": t.uint32_t, + "gpdCommandId": t.uint8_t, + "mic": t.uint32_t, + "proxyTableIndex": t.uint8_t, + "gpdCommandPayload": t.LVBytes, + }, + ), } del COMMANDS["becomeTrustCenter"] # this one was likely removed earlier diff --git a/bellows/ezsp/v16/commands.py b/bellows/ezsp/v16/commands.py index 51dc4248..4466fb0d 100644 --- a/bellows/ezsp/v16/commands.py +++ b/bellows/ezsp/v16/commands.py @@ -1,5 +1,28 @@ +import bellows.types as t + from ..v14.commands import COMMANDS as COMMANDS_v14 COMMANDS = { **COMMANDS_v14, + # v16 appends an SlRxPacketInfo trailer to the v13/v14 layout. + "gpepIncomingMessageHandler": ( + 0x00C5, + {}, + { + "status": t.uint8_t, + "gpdLink": t.uint8_t, + "sequenceNumber": t.uint8_t, + "addr": t.EmberGpAddress, + "gpdfSecurityLevel": t.EmberGpSecurityLevel, + "gpdfSecurityKeyType": t.EmberGpKeyType, + "autoCommissioning": t.Bool, + "bidirectionalInfo": t.uint8_t, + "gpdSecurityFrameCounter": t.uint32_t, + "gpdCommandId": t.uint8_t, + "mic": t.uint32_t, + "proxyTableIndex": t.uint8_t, + "gpdCommandPayload": t.LVBytes, + "packetInfo": t.SlRxPacketInfo, + }, + ), } diff --git a/bellows/types/struct.py b/bellows/types/struct.py index 91a17c01..a8b59ccd 100644 --- a/bellows/types/struct.py +++ b/bellows/types/struct.py @@ -357,14 +357,19 @@ class EmberTokTypeStackZllSecurity(EzspStruct): class EmberGpAddress(EzspStruct): - # A GP address structure. - # The GPD's EUI64. - gpdIeeeAddress: named.EUI64 - # The GPD's source ID. - sourceId: basic.uint32_t - # The GPD Application ID. + """sl_zigbee_gp_address_t: GPD address used by the GP callbacks. + + The C SDK declares the union ``id`` field first, but the EZSP wire + layout for ``gpepIncomingMessageHandler`` serializes ``applicationId`` + first. The order below matches what the NCP actually sends; see the + real-frame test in ``tests/test_ezsp_v13.py``. + """ + applicationId: basic.uint8_t - # The GPD endpoint. + # 8-byte union. When applicationId == 0 (SrcID), the first 4 bytes + # hold the 32-bit source ID (little-endian) and the rest is padding; + # when applicationId == 2 (IEEE), the 8 bytes are the GPD EUI64. + id: basic.FixedList[basic.uint8_t, 8] endpoint: basic.uint8_t diff --git a/bellows/zigbee/application.py b/bellows/zigbee/application.py index 1bd0271c..5a1559ff 100644 --- a/bellows/zigbee/application.py +++ b/bellows/zigbee/application.py @@ -21,7 +21,10 @@ import zigpy.state import zigpy.types import zigpy.util +from zigpy.zcl import foundation +from zigpy.zcl.clusters.greenpower import NotificationOptions, NotificationSchema import zigpy.zdo.types as zdo_t +import zigpy.zgp.types as zgp_t import bellows from bellows.config import ( @@ -56,6 +59,7 @@ COUNTER_RX_BCAST = "broadcast_rx" COUNTER_RX_MCAST = "multicast_rx" COUNTER_RX_UNICAST = "unicast_rx" +COUNTER_RX_GP = "green_power_rx" COUNTER_UNKNOWN_DEVICE = "unknown_device_rx" COUNTER_WATCHDOG = "watchdog_reset_requests" COUNTERS_EZSP = "ezsp_counters" @@ -75,6 +79,12 @@ DEFAULT_TX_POWER = 8 # dBm +# TODO: replace with zigpy.zgp.GP_ENDPOINT / GP_CLUSTER_ID / GP_PROFILE_ID +# once those are released upstream. +GP_ENDPOINT = 242 +GP_CLUSTER_ID = 0x0021 +GP_PROFILE_ID = 0xA1E0 + LIB_VERSION = importlib.metadata.version("bellows") LOGGER = logging.getLogger(__name__) @@ -696,6 +706,109 @@ def ezsp_callback_handler(self, frame_name, args): self.handle_route_error(status, nwk) elif frame_name == "idConflictHandler": self._handle_id_conflict(*args) + elif frame_name == "gpepIncomingMessageHandler": + self._handle_gp_frame(args) + + def _handle_gp_frame(self, args: tuple) -> None: + """Forward gpepIncomingMessageHandler as a ZCL GP Notification.""" + if len(args) < 13: + LOGGER.debug("gpepIncomingMessageHandler: short args %r, dropping", args) + return + + ( + _status, + gpd_link, + sequence_number, + addr, + gpdf_security_level, + gpdf_security_key_type, + _auto_commissioning, + _bidirectional_info, + gpd_security_frame_counter, + gpd_command_id, + _mic, + _proxy_table_index, + gpd_command_payload, + *rest, + ) = args + + # On EZSP < v13 the 4th argument is a uint8_t addrType, not an + # EmberGpAddress. Drop those frames rather than crashing on + # attribute access; bellows does not currently parse the legacy + # layout. + if not isinstance(addr, t.EmberGpAddress): + LOGGER.debug( + "gpepIncomingMessageHandler: unsupported legacy address layout, " + "dropping" + ) + return + + if addr.applicationId != zgp_t.ApplicationID.SrcID: + LOGGER.debug( + "GP frame with unsupported applicationId %s, dropping", + addr.applicationId, + ) + return + + source_id = int.from_bytes(bytes(addr.id[:4]), "little") + # v16+ appends an SlRxPacketInfo after the payload. + packet_info = rest[0] if rest else None + + options = NotificationOptions( + application_id=zgp_t.ApplicationID(addr.applicationId), + also_unicast=0, + also_derived_group=0, + also_commissioned_group=0, + security_level=zgp_t.SecurityLevel(gpdf_security_level), + security_key_type=zgp_t.SecurityKeyType(gpdf_security_key_type), + appoint_temp_master=0, + tx_queue_full=0, + _reserved=0, + ) + notification = NotificationSchema( + options=options, + gpd_id=zgp_t.DeviceID(source_id), + frame_counter=zigpy.types.uint32_t(gpd_security_frame_counter), + command_id=zigpy.types.uint8_t(gpd_command_id), + payload=zigpy.types.LVBytes(bytes(gpd_command_payload)), + ) + tsn = zigpy.types.uint8_t(int(sequence_number) & 0xFF) + zcl_header = foundation.ZCLHeader.cluster(tsn=tsn, command_id=0x00) + zcl_bytes = zcl_header.serialize() + notification.serialize() + + if packet_info is not None: + proxy_nwk = int(packet_info.sender_short_id) + lqi = int(packet_info.last_hop_lqi) + rssi = int(packet_info.last_hop_rssi) + else: + # No SlRxPacketInfo on v13/v14: the coordinator stands in as + # the proxy and gpdLink is the only signal-quality byte. + proxy_nwk = int(self.state.node_info.nwk) + lqi = int(gpd_link) + rssi = 0 + + self.state.counters[COUNTERS_CTRL][COUNTER_RX_GP].increment() + + self.packet_received( + zigpy.types.ZigbeePacket( + src=zigpy.types.AddrModeAddress( + addr_mode=zigpy.types.AddrMode.NWK, + address=zigpy.types.NWK(proxy_nwk), + ), + src_ep=zigpy.types.uint8_t(GP_ENDPOINT), + dst=zigpy.types.AddrModeAddress( + addr_mode=zigpy.types.AddrMode.NWK, + address=self.state.node_info.nwk, + ), + dst_ep=zigpy.types.uint8_t(GP_ENDPOINT), + tsn=tsn, + profile_id=zigpy.types.uint16_t(GP_PROFILE_ID), + cluster_id=zigpy.types.uint16_t(GP_CLUSTER_ID), + data=zigpy.types.SerializableBytes(zcl_bytes), + lqi=zigpy.types.uint8_t(lqi), + rssi=zigpy.types.int8s(rssi), + ) + ) def _handle_frame( self, diff --git a/tests/test_application.py b/tests/test_application.py index 018dd632..21f7bddd 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -541,6 +541,213 @@ def test_frame_handler_ignored(app, aps_frame): assert app.packet_received.call_count == 0 +from zigpy.zcl import foundation # noqa: E402 +from zigpy.zcl.clusters.greenpower import ( # noqa: E402 + NotificationOptions, + NotificationSchema, +) +import zigpy.zgp.types as zgp_t # noqa: E402 + + +def _gp_addr_srcid(source_id: int, endpoint: int = 0x00) -> t.EmberGpAddress: + id_bytes = source_id.to_bytes(4, "little") + bytes(4) + return t.EmberGpAddress( + applicationId=t.uint8_t(0), + id=t.FixedList[t.uint8_t, 8](id_bytes), + endpoint=t.uint8_t(endpoint), + ) + + +def _gp_args_v13( + *, + source_id: int = 0x0171F886, + status: int = 0x7C, + gpd_link: int = 0xCD, + sequence_number: int = 0xEC, + security_level: int = 0, + security_key_type: int = 0, + frame_counter: int = 0xFFFFFFFF, + command_id: int = 0xE0, + payload: bytes = b"\x02\xc5\xf2" + bytes(43), +) -> list: + """Build the args list as delivered on EZSP v13/v14 (no trailer).""" + return [ + t.uint8_t(status), + t.uint8_t(gpd_link), + t.uint8_t(sequence_number), + _gp_addr_srcid(source_id), + t.EmberGpSecurityLevel(security_level), + t.EmberGpKeyType(security_key_type), + t.Bool(False), # autoCommissioning + t.uint8_t(0), # bidirectionalInfo + t.uint32_t(frame_counter), + t.uint8_t(command_id), + t.uint32_t(0xFFFFFFFF), # mic + t.uint8_t(0xFF), # proxyTableIndex + t.LVBytes(payload), + ] + + +def _expected_gp_packet( + *, + src_nwk, + dst_nwk, + source_id: int, + sequence_number: int, + command_id: int, + payload: bytes, + frame_counter: int, + lqi: int, + rssi: int, + security_level: int = 0, + security_key_type: int = 0, +) -> zigpy.types.ZigbeePacket: + options = NotificationOptions( + application_id=zgp_t.ApplicationID.SrcID, + also_unicast=0, + also_derived_group=0, + also_commissioned_group=0, + security_level=zgp_t.SecurityLevel(security_level), + security_key_type=zgp_t.SecurityKeyType(security_key_type), + appoint_temp_master=0, + tx_queue_full=0, + _reserved=0, + ) + notification = NotificationSchema( + options=options, + gpd_id=zgp_t.DeviceID(source_id), + frame_counter=zigpy.types.uint32_t(frame_counter), + command_id=zigpy.types.uint8_t(command_id), + payload=zigpy.types.LVBytes(payload), + ) + tsn = zigpy.types.uint8_t(sequence_number & 0xFF) + zcl_bytes = ( + foundation.ZCLHeader.cluster(tsn=tsn, command_id=0x00).serialize() + + notification.serialize() + ) + return zigpy.types.ZigbeePacket( + src=zigpy.types.AddrModeAddress( + addr_mode=zigpy.types.AddrMode.NWK, + address=zigpy.types.NWK(src_nwk), + ), + src_ep=zigpy.types.uint8_t(242), + dst=zigpy.types.AddrModeAddress( + addr_mode=zigpy.types.AddrMode.NWK, address=dst_nwk + ), + dst_ep=zigpy.types.uint8_t(242), + tsn=tsn, + profile_id=zigpy.types.uint16_t(0xA1E0), + cluster_id=zigpy.types.uint16_t(0x0021), + data=zigpy.types.SerializableBytes(zcl_bytes), + lqi=zigpy.types.uint8_t(lqi), + rssi=zigpy.types.int8s(rssi), + ) + + +def test_gp_frame_handler_forwards_notification(app): + """A GPDF is forwarded as the exact ZCL GP Notification packet.""" + app.ezsp_callback_handler("gpepIncomingMessageHandler", _gp_args_v13()) + + expected = _expected_gp_packet( + src_nwk=int(app.state.node_info.nwk), + dst_nwk=app.state.node_info.nwk, + source_id=0x0171F886, + sequence_number=0xEC, + command_id=0xE0, + payload=b"\x02\xc5\xf2" + bytes(43), + frame_counter=0xFFFFFFFF, + lqi=0xCD, + rssi=0, + ) + assert app.packet_received.mock_calls == [call(expected)] + assert ( + app.state.counters[bellows.zigbee.application.COUNTERS_CTRL][ + bellows.zigbee.application.COUNTER_RX_GP + ] + == 1 + ) + + +def test_gp_frame_handler_v16_uses_packet_info(app): + """On v16+, the trailing SlRxPacketInfo provides the proxy NWK + LQI/RSSI.""" + packet_info = t.SlRxPacketInfo( + sender_short_id=t.NWK(0x1234), + sender_long_id=t.EUI64.convert("00:11:22:33:44:55:66:77"), + binding_index=t.uint8_t(0xFF), + address_index=t.uint8_t(0xFF), + last_hop_lqi=t.uint8_t(210), + last_hop_rssi=t.int8s(-40), + last_hop_timestamp=t.uint32_t(0xDEADBEEF), + ) + args = _gp_args_v13() + [packet_info] + + app.ezsp_callback_handler("gpepIncomingMessageHandler", args) + + expected = _expected_gp_packet( + src_nwk=0x1234, + dst_nwk=app.state.node_info.nwk, + source_id=0x0171F886, + sequence_number=0xEC, + command_id=0xE0, + payload=b"\x02\xc5\xf2" + bytes(43), + frame_counter=0xFFFFFFFF, + lqi=210, + rssi=-40, + ) + assert app.packet_received.mock_calls == [call(expected)] + + +def test_gp_frame_handler_data_command_toggle(app): + """A non-commissioning command (Toggle) is forwarded the same way.""" + app.ezsp_callback_handler( + "gpepIncomingMessageHandler", + _gp_args_v13(command_id=0x22, payload=b"", frame_counter=42), + ) + + expected = _expected_gp_packet( + src_nwk=int(app.state.node_info.nwk), + dst_nwk=app.state.node_info.nwk, + source_id=0x0171F886, + sequence_number=0xEC, + command_id=0x22, + payload=b"", + frame_counter=42, + lqi=0xCD, + rssi=0, + ) + assert app.packet_received.mock_calls == [call(expected)] + + +def test_gp_frame_handler_ieee_application_id_dropped(app): + """IEEE-addressed GPDs are dropped: the zigpy GP stack only handles SrcID.""" + args = _gp_args_v13() + args[3] = t.EmberGpAddress( + applicationId=t.uint8_t(2), + id=t.FixedList[t.uint8_t, 8](bytes(8)), + endpoint=t.uint8_t(1), + ) + + app.ezsp_callback_handler("gpepIncomingMessageHandler", args) + + assert app.packet_received.call_count == 0 + + +def test_gp_frame_handler_short_args_dropped(app): + """Malformed callbacks never crash the dispatcher.""" + app.ezsp_callback_handler("gpepIncomingMessageHandler", [0x00, 0x00]) + assert app.packet_received.call_count == 0 + + +def test_gp_frame_handler_legacy_address_layout_dropped(app): + """EZSP < v13 delivers a uint8_t addrType, not an EmberGpAddress.""" + args = _gp_args_v13() + args[3] = t.uint8_t(0) # legacy addrType in place of EmberGpAddress + + app.ezsp_callback_handler("gpepIncomingMessageHandler", args) + + assert app.packet_received.call_count == 0 + + @pytest.mark.parametrize( "msg_type", ( diff --git a/tests/test_ezsp_v13.py b/tests/test_ezsp_v13.py index 69c5909f..016fb688 100644 --- a/tests/test_ezsp_v13.py +++ b/tests/test_ezsp_v13.py @@ -227,3 +227,101 @@ async def test_factory_reset(ezsp_f) -> None: assert ezsp_f.tokenFactoryReset.mock_calls == [ call(excludeOutgoingFC=False, excludeBootCounter=False) ] + + +# --------------------------------------------------------------------------- +# gpepIncomingMessageHandler (0x00C5) +# +# Real payload captured on 2026-04-22 from a Busch-Jaeger 6716 U +# "Friends of Hue" switch paired through a Silabs ZBT-1 stick running +# EZSP v13/v14. The raw bytes come straight from the bellows warning log +# reported in zigpy/zigpy#1814 before this fix landed. +# --------------------------------------------------------------------------- + + +# Payload without the EZSP frame envelope, as passed to ``deserialize_dict``. +BJ6716U_GPEP_PAYLOAD: bytes = bytes.fromhex( + "7ccdec" # status, gpdLink, sequenceNumber + "00" # EmberGpAddress.applicationId = 0 (SrcID) + "86f8710186f87101" # EmberGpAddress.id (8-byte union, + # sourceId = 0x0171F886 in first 4 LE) + "00" # EmberGpAddress.endpoint + "00" # gpdfSecurityLevel + "00" # gpdfSecurityKeyType + "00" # autoCommissioning + "00" # bidirectionalInfo + "ffffffff" # gpdSecurityFrameCounter (commissioning) + "e0" # gpdCommandId = 0xE0 (Commissioning) + "ffffffff" # mic + "ff" # proxyTableIndex + "2e" # LVBytes length = 46 + "02c5f2" # commissioning: deviceId, options, extOptions + "1ce9ae2f9e4f85f15de37c1ccbd94387" # encrypted GPD key (16 bytes) + "0013911a" # KeyMIC + "ec1d0000" # OutgoingCounter = 0x00001DEC + "04" # AppInfo + "11" # NumGPDCommands = 17 + "1011121314151617" # RecallScene 0-7 + "22" # Toggle + "6062636465666768" # Press/Release variants +) + + +def test_gpep_incoming_real_frame_bj6716u(): + """Parse the commissioning frame captured by a community tester. + + Before this schema override the v4 layout inherited up to v16 treated + the address as five scattered fields and ran off the end of the + buffer. The symptom was ``ValueError: Data is too short`` exactly as + reported in zigpy/zigpy#1814. + """ + _, _, rx_schema = bellows.ezsp.v13.commands.COMMANDS["gpepIncomingMessageHandler"] + result, rest = t.deserialize_dict(BJ6716U_GPEP_PAYLOAD, rx_schema) + + assert rest == b"" + # ``status`` comes through as a plain byte — the strict ``sl_GpStatus`` + # enum only covers 0x00..0x07 and would have dropped the whole frame. + assert result["status"] == 0x7C + assert result["gpdLink"] == 0xCD + assert result["sequenceNumber"] == 0xEC + + addr = result["addr"] + assert addr.applicationId == 0 + assert int.from_bytes(bytes(addr.id[:4]), "little") == 0x0171F886 + assert addr.endpoint == 0 + + assert result["gpdfSecurityLevel"] == 0 + assert result["gpdfSecurityKeyType"] == 0 + assert result["gpdSecurityFrameCounter"] == 0xFFFFFFFF + assert result["gpdCommandId"] == 0xE0 # GPD Commissioning + # Commissioning payload: deviceId=0x02, then options, then the 17 + # advertised GPD commands at the tail. + payload = result["gpdCommandPayload"] + assert len(payload) == 46 + assert payload[:3] == b"\x02\xc5\xf2" + assert payload[-8:] == bytes.fromhex("6062636465666768") + + +def test_gpep_incoming_via_frame_rx(ezsp_f): + """The same frame wrapped in the EZSP v13 envelope reaches the callback. + + Builds a synthetic EZSP frame (``seq`` + padding + ``frame_id`` LE + + payload) and feeds it to the protocol handler. The callback must be + dispatched with the parsed result — no ``Failed to parse frame`` + warning. + """ + envelope = ( + bytes([0x42, 0x00, 0x01]) # seq + control bytes + + t.uint16_t(0x00C5).serialize() # frame_id LE + + BJ6716U_GPEP_PAYLOAD + ) + + ezsp_f(envelope) + + assert ezsp_f._handle_callback.call_count == 1 + assert ezsp_f._handle_callback.call_args[0][0] == "gpepIncomingMessageHandler" + parsed = ezsp_f._handle_callback.call_args[0][1] + # ``parsed`` is the list of values in schema order. + assert parsed[0] == 0x7C # status + assert int.from_bytes(bytes(parsed[3].id[:4]), "little") == 0x0171F886 + assert parsed[9] == 0xE0 # gpdCommandId diff --git a/tests/test_ezsp_v14.py b/tests/test_ezsp_v14.py index 49bf152b..02b39182 100644 --- a/tests/test_ezsp_v14.py +++ b/tests/test_ezsp_v14.py @@ -226,3 +226,29 @@ async def test_send_broadcast(ezsp_f) -> None: message=b"hello", ) ] + + +def test_gpep_incoming_inherits_v13_schema(ezsp_f): + """v14 must pick up the v13 override of ``gpepIncomingMessageHandler``. + + The fix is declared in v13; v14 re-exports commands through the + ``_REPLACEMENTS`` loop, and the plain ``uint8_t`` status passes + through unchanged because the loop only rewrites ``EmberStatus`` and + ``EzspStatus``. Feeding the real Busch-Jaeger 6716 U capture must + dispatch the callback without warnings. + """ + from tests.test_ezsp_v13 import BJ6716U_GPEP_PAYLOAD + + envelope = ( + bytes([0x42, 0x00, 0x01]) + + t.uint16_t(0x00C5).serialize() + + BJ6716U_GPEP_PAYLOAD + ) + + ezsp_f(envelope) + + assert ezsp_f._handle_callback.call_count == 1 + assert ezsp_f._handle_callback.call_args[0][0] == "gpepIncomingMessageHandler" + parsed = ezsp_f._handle_callback.call_args[0][1] + assert int.from_bytes(bytes(parsed[3].id[:4]), "little") == 0x0171F886 + assert parsed[9] == 0xE0 # gpdCommandId diff --git a/tests/test_ezsp_v16.py b/tests/test_ezsp_v16.py index ee75c22b..e31f079c 100644 --- a/tests/test_ezsp_v16.py +++ b/tests/test_ezsp_v16.py @@ -31,3 +31,41 @@ def test_ezsp_frame_rx(ezsp_f): assert ezsp_f._handle_callback.call_count == 1 assert ezsp_f._handle_callback.call_args[0][0] == "version" assert ezsp_f._handle_callback.call_args[0][1] == [0x01, 0x02, 0x1234] + + +def test_gpep_incoming_v16_expects_trailing_packet_info(ezsp_f): + """v16 appends an ``SlRxPacketInfo`` struct after the LVBytes payload. + + zigbee-herdsman gates the read on ``version >= 0x10``. We simulate + that wire format by taking the v13 payload captured from a real + Busch-Jaeger 6716 U and appending a synthetic packet info trailer. + """ + from tests.test_ezsp_v13 import BJ6716U_GPEP_PAYLOAD + + packet_info = t.SlRxPacketInfo( + sender_short_id=t.NWK(0x1234), + sender_long_id=t.EUI64.convert("00:11:22:33:44:55:66:77"), + binding_index=0xFF, + address_index=0xFF, + last_hop_lqi=200, + last_hop_rssi=-45, + last_hop_timestamp=0xDEADBEEF, + ).serialize() + + envelope = ( + bytes([0x42, 0x00, 0x01]) + + t.uint16_t(0x00C5).serialize() + + BJ6716U_GPEP_PAYLOAD + + packet_info + ) + + ezsp_f(envelope) + + assert ezsp_f._handle_callback.call_count == 1 + assert ezsp_f._handle_callback.call_args[0][0] == "gpepIncomingMessageHandler" + parsed = ezsp_f._handle_callback.call_args[0][1] + assert int.from_bytes(bytes(parsed[3].id[:4]), "little") == 0x0171F886 + assert parsed[9] == 0xE0 # gpdCommandId + assert parsed[-1].sender_short_id == 0x1234 + assert parsed[-1].last_hop_lqi == 200 + assert parsed[-1].last_hop_rssi == -45 diff --git a/tests/test_types.py b/tests/test_types.py index a724d96a..8783865b 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -28,3 +28,14 @@ def test_ember_node_type_to_zdo_logical_type(node_type, logical_type): node_type = t.EmberNodeType(node_type) assert node_type.zdo_logical_type == zdo_t.LogicalType(logical_type) + + +def test_ember_gp_address_roundtrip(): + """Wire layout: applicationId(1) + id(8) + endpoint(1).""" + raw = b"\x00" + b"\x86\xf8\x71\x01" + bytes(4) + b"\x00" + addr, rest = t.EmberGpAddress.deserialize(raw) + assert rest == b"" + assert addr.applicationId == 0 + assert addr.endpoint == 0 + assert bytes(addr.id) == b"\x86\xf8\x71\x01" + bytes(4) + assert addr.serialize() == raw