Skip to content
Open
22 changes: 22 additions & 0 deletions bellows/ezsp/v13/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,28 @@
"status": t.sl_Status,
},
),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These verbose Claude comments are not useful, please remove them.

# status is kept as uint8_t (not sl_GpStatus) to accept values
# outside the strict 0x00..0x07 range that NCPs return for frames
# with no matching proxy/sink entry.
"gpepIncomingMessageHandler": (
0x00C5,
{},
{
"status": t.uint8_t,
"gpdLink": t.uint8_t,
"sequenceNumber": t.uint8_t,
"addr": t.EmberGpAddress,
"gpdfSecurityLevel": t.EmberGpSecurityLevel,
"gpdfSecurityKeyType": t.EmberGpKeyType,
"autoCommissioning": t.Bool,
"bidirectionalInfo": t.uint8_t,
"gpdSecurityFrameCounter": t.uint32_t,
"gpdCommandId": t.uint8_t,
"mic": t.uint32_t,
"proxyTableIndex": t.uint8_t,
"gpdCommandPayload": t.LVBytes,
},
),
}

del COMMANDS["becomeTrustCenter"] # this one was likely removed earlier
Expand Down
23 changes: 23 additions & 0 deletions bellows/ezsp/v16/commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,28 @@
import bellows.types as t

from ..v14.commands import COMMANDS as COMMANDS_v14

COMMANDS = {
**COMMANDS_v14,
# v16 appends an SlRxPacketInfo trailer to the v13/v14 layout.
"gpepIncomingMessageHandler": (
0x00C5,
{},
{
"status": t.uint8_t,
"gpdLink": t.uint8_t,
"sequenceNumber": t.uint8_t,
"addr": t.EmberGpAddress,
"gpdfSecurityLevel": t.EmberGpSecurityLevel,
"gpdfSecurityKeyType": t.EmberGpKeyType,
"autoCommissioning": t.Bool,
"bidirectionalInfo": t.uint8_t,
"gpdSecurityFrameCounter": t.uint32_t,
"gpdCommandId": t.uint8_t,
"mic": t.uint32_t,
"proxyTableIndex": t.uint8_t,
"gpdCommandPayload": t.LVBytes,
"packetInfo": t.SlRxPacketInfo,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested this with an adapter running EZSPv13 and EZSPv18 and parsing fails because packetInfo is not actually part of the command payload.

},
),
}
19 changes: 12 additions & 7 deletions bellows/types/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,14 +357,19 @@ class EmberTokTypeStackZllSecurity(EzspStruct):


class EmberGpAddress(EzspStruct):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SDK source shows the definition as:

/**
 * @brief GPD Address for sending and receiving a GPDF.
 */
typedef struct {
  union {
    /** The IEEE address is used when the application identifier is
     *  ::SL_ZIGBEE_GP_APPLICATION_IEEE_ADDRESS.
     */
    sl_802154_long_addr_t gpdIeeeAddress;
    /** The 32-bit source identifier is used when the application identifier is
     *  ::SL_ZIGBEE_GP_APPLICATION_SOURCE_ID.
     */
    sl_zigbee_gp_source_id_t sourceId;
  } id;
  /** Application identifier of the GPD. */
  sl_zigbee_gp_application_id_t applicationId;
  /** Application endpoint , only used when application identifier is
   * ::SL_ZIGBEE_GP_APPLICATION_IEEE_ADDRESS
   */
  uint8_t endpoint;
} sl_zigbee_gp_address_t;

This differs from the field order in this struct.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried matching the SDK order but the captured frame from @dmatscheko test fails to parse: the EZSP wire layout serializes applicationId first, even though the C struct declares id first.

# A GP address structure.
# The GPD's EUI64.
gpdIeeeAddress: named.EUI64
# The GPD's source ID.
sourceId: basic.uint32_t
# The GPD Application ID.
"""sl_zigbee_gp_address_t: GPD address used by the GP callbacks.

The C SDK declares the union ``id`` field first, but the EZSP wire
layout for ``gpepIncomingMessageHandler`` serializes ``applicationId``
first. The order below matches what the NCP actually sends; see the
real-frame test in ``tests/test_ezsp_v13.py``.
"""

applicationId: basic.uint8_t
# The GPD endpoint.
# 8-byte union. When applicationId == 0 (SrcID), the first 4 bytes
# hold the 32-bit source ID (little-endian) and the rest is padding;
# when applicationId == 2 (IEEE), the 8 bytes are the GPD EUI64.
id: basic.FixedList[basic.uint8_t, 8]
endpoint: basic.uint8_t


Expand Down
113 changes: 113 additions & 0 deletions bellows/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import zigpy.state
import zigpy.types
import zigpy.util
from zigpy.zcl import foundation
from zigpy.zcl.clusters.greenpower import NotificationOptions, NotificationSchema
import zigpy.zdo.types as zdo_t
import zigpy.zgp.types as zgp_t

import bellows
from bellows.config import (
Expand Down Expand Up @@ -56,6 +59,7 @@
COUNTER_RX_BCAST = "broadcast_rx"
COUNTER_RX_MCAST = "multicast_rx"
COUNTER_RX_UNICAST = "unicast_rx"
COUNTER_RX_GP = "green_power_rx"
COUNTER_UNKNOWN_DEVICE = "unknown_device_rx"
COUNTER_WATCHDOG = "watchdog_reset_requests"
COUNTERS_EZSP = "ezsp_counters"
Expand All @@ -75,6 +79,12 @@

DEFAULT_TX_POWER = 8 # dBm

# TODO: replace with zigpy.zgp.GP_ENDPOINT / GP_CLUSTER_ID / GP_PROFILE_ID
# once those are released upstream.
GP_ENDPOINT = 242
GP_CLUSTER_ID = 0x0021
GP_PROFILE_ID = 0xA1E0

LIB_VERSION = importlib.metadata.version("bellows")
LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -696,6 +706,109 @@ def ezsp_callback_handler(self, frame_name, args):
self.handle_route_error(status, nwk)
elif frame_name == "idConflictHandler":
self._handle_id_conflict(*args)
elif frame_name == "gpepIncomingMessageHandler":
self._handle_gp_frame(args)

def _handle_gp_frame(self, args: tuple) -> None:
"""Forward gpepIncomingMessageHandler as a ZCL GP Notification."""
if len(args) < 13:
LOGGER.debug("gpepIncomingMessageHandler: short args %r, dropping", args)
return

(
_status,
gpd_link,
sequence_number,
addr,
gpdf_security_level,
gpdf_security_key_type,
_auto_commissioning,
_bidirectional_info,
gpd_security_frame_counter,
gpd_command_id,
_mic,
_proxy_table_index,
gpd_command_payload,
*rest,
) = args

# On EZSP < v13 the 4th argument is a uint8_t addrType, not an
# EmberGpAddress. Drop those frames rather than crashing on
# attribute access; bellows does not currently parse the legacy
# layout.
if not isinstance(addr, t.EmberGpAddress):
LOGGER.debug(
"gpepIncomingMessageHandler: unsupported legacy address layout, "
"dropping"
)
return

if addr.applicationId != zgp_t.ApplicationID.SrcID:
LOGGER.debug(
Comment on lines +714 to +747
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_handle_gp_frame() assumes the v13+ callback layout (where args[3] is an EmberGpAddress). On older EZSP versions (e.g. v4 schema), gpepIncomingMessageHandler’s 4th argument is addrType (a uint8_t), so addr.applicationId will raise AttributeError and can crash the callback dispatcher. Add a version/shape guard (e.g. isinstance(addr, t.EmberGpAddress)) and either translate the legacy layout into an EmberGpAddress or drop/log the frame safely.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken into account in 2df0ea5
Added isinstance(addr, t.EmberGpAddress) guard with a debug log and a regression test

"GP frame with unsupported applicationId %s, dropping",
addr.applicationId,
)
return

source_id = int.from_bytes(bytes(addr.id[:4]), "little")
# v16+ appends an SlRxPacketInfo after the payload.
packet_info = rest[0] if rest else None

options = NotificationOptions(
application_id=zgp_t.ApplicationID(addr.applicationId),
also_unicast=0,
also_derived_group=0,
also_commissioned_group=0,
security_level=zgp_t.SecurityLevel(gpdf_security_level),
security_key_type=zgp_t.SecurityKeyType(gpdf_security_key_type),
appoint_temp_master=0,
tx_queue_full=0,
_reserved=0,
)
notification = NotificationSchema(
options=options,
gpd_id=zgp_t.DeviceID(source_id),
frame_counter=zigpy.types.uint32_t(gpd_security_frame_counter),
command_id=zigpy.types.uint8_t(gpd_command_id),
payload=zigpy.types.LVBytes(bytes(gpd_command_payload)),
)
tsn = zigpy.types.uint8_t(int(sequence_number) & 0xFF)
zcl_header = foundation.ZCLHeader.cluster(tsn=tsn, command_id=0x00)
zcl_bytes = zcl_header.serialize() + notification.serialize()

if packet_info is not None:
proxy_nwk = int(packet_info.sender_short_id)
lqi = int(packet_info.last_hop_lqi)
rssi = int(packet_info.last_hop_rssi)
else:
# No SlRxPacketInfo on v13/v14: the coordinator stands in as
# the proxy and gpdLink is the only signal-quality byte.
proxy_nwk = int(self.state.node_info.nwk)
lqi = int(gpd_link)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the v13/v14 fallback when no SlRxPacketInfo trailer is present.
Would you rather drop a synthetic LQI/RSSI on v13/v14 (e.g. lqi=0, rssi=0) ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added in 2df0ea5

rssi = 0

self.state.counters[COUNTERS_CTRL][COUNTER_RX_GP].increment()

self.packet_received(
zigpy.types.ZigbeePacket(
src=zigpy.types.AddrModeAddress(
addr_mode=zigpy.types.AddrMode.NWK,
address=zigpy.types.NWK(proxy_nwk),
),
src_ep=zigpy.types.uint8_t(GP_ENDPOINT),
dst=zigpy.types.AddrModeAddress(
addr_mode=zigpy.types.AddrMode.NWK,
address=self.state.node_info.nwk,
),
dst_ep=zigpy.types.uint8_t(GP_ENDPOINT),
tsn=tsn,
profile_id=zigpy.types.uint16_t(GP_PROFILE_ID),
cluster_id=zigpy.types.uint16_t(GP_CLUSTER_ID),
data=zigpy.types.SerializableBytes(zcl_bytes),
lqi=zigpy.types.uint8_t(lqi),
rssi=zigpy.types.int8s(rssi),
)
)

def _handle_frame(
self,
Expand Down
Loading
Loading