diff --git a/libfabric.vcxproj b/libfabric.vcxproj
index 76d1f73ba48..32ef3aa32f4 100644
--- a/libfabric.vcxproj
+++ b/libfabric.vcxproj
@@ -906,6 +906,8 @@
+
+
@@ -1025,6 +1027,8 @@
+
+
diff --git a/prov/efa/Makefile.include b/prov/efa/Makefile.include
index 3d7780e389f..54eb60331e1 100644
--- a/prov/efa/Makefile.include
+++ b/prov/efa/Makefile.include
@@ -79,7 +79,9 @@ _efa_files = \
prov/efa/src/rdm/efa_rdm_tracepoint_def.c \
prov/efa/src/rdm/efa_rdm_srx.c \
prov/efa/src/rdm/efa_rdm_util.c \
- prov/efa/src/rdm/efa_rdm_mr.c
+ prov/efa/src/rdm/efa_rdm_mr.c \
+ prov/efa/src/rdm/efa_rdm_proto.c \
+ prov/efa/src/rdm/efa_rdm_proto_eager.c
if ENABLE_EFA_UNIT_TEST
_efa_files += prov/efa/test/efa_unit_test_data_path_ops.c
@@ -142,7 +144,9 @@ _efa_headers = \
prov/efa/src/rdm/efa_rdm_tracepoint.h \
prov/efa/src/rdm/efa_rdm_srx.h \
prov/efa/src/rdm/efa_rdm_util.h \
- prov/efa/src/rdm/efa_rdm_mr.h
+ prov/efa/src/rdm/efa_rdm_mr.h \
+ prov/efa/src/rdm/efa_rdm_proto.h \
+ prov/efa/src/rdm/efa_rdm_proto_eager.h
if HAVE_LTTNG
efa_LDFLAGS += -llttng-ust
@@ -176,7 +180,8 @@ nodist_prov_efa_test_efa_unit_test_SOURCES = \
prov/efa/test/efa_unit_test_msg.c \
prov/efa/test/efa_unit_test_rma.c \
prov/efa/test/efa_unit_test_rdm_rma.c \
- prov/efa/test/efa_unit_test_data_path_direct.c
+ prov/efa/test/efa_unit_test_data_path_direct.c \
+ prov/efa/test/efa_unit_test_proto.c
efa_CPPFLAGS += -I$(top_srcdir)/include -I$(top_srcdir)/prov/efa/test $(cmocka_CPPFLAGS)
diff --git a/prov/efa/src/rdm/efa_rdm_cq.c b/prov/efa/src/rdm/efa_rdm_cq.c
index cbe99aa2c0c..d656810157c 100644
--- a/prov/efa/src/rdm/efa_rdm_cq.c
+++ b/prov/efa/src/rdm/efa_rdm_cq.c
@@ -804,8 +804,36 @@ enum ibv_wc_status efa_rdm_cq_process_wc(struct efa_ibv_cq *cq, struct efa_rdm_e
#if ENABLE_DEBUG
ep->send_comps++;
#endif
- efa_rdm_pke_handle_send_completion(pkt_entry);
- efa_rdm_cq_increment_pkt_entry_gen(pkt_entry);
+ if (pkt_entry->handle_pke) {
+ efa_rdm_ep_record_tx_op_completed(pkt_entry->ep,
+ pkt_entry);
+ /*
+ * For a send completion, pkt_entry->peer can be
+ * NULL in 3 situations:
+ * 1. the pkt_entry is used for a local read
+ * operation
+ * 2. a new peer with same gid+qpn was inserted
+ * to av, thus the peer was removed from AV.
+ * 3. application removed the peer's address
+ * from av. In 1, we should proceed. For 2 and
+ * 3, the send completion should be ignored.
+ */
+ if (!pkt_entry->peer &&
+ !(pkt_entry->flags &
+ EFA_RDM_PKE_LOCAL_READ)) {
+ EFA_WARN(
+ FI_LOG_CQ,
+ "ignoring send completion of a "
+ "packet to a removed peer.\n");
+ efa_rdm_pke_release_tx(pkt_entry);
+ } else {
+ pkt_entry->handle_pke(pkt_entry);
+ }
+ efa_rdm_cq_increment_pkt_entry_gen(pkt_entry);
+ } else {
+ efa_rdm_pke_handle_send_completion(pkt_entry);
+ efa_rdm_cq_increment_pkt_entry_gen(pkt_entry);
+ }
break;
case IBV_WC_RECV:
/* efa_rdm_cq_handle_recv_completion does additional work to determine the source
diff --git a/prov/efa/src/rdm/efa_rdm_msg.c b/prov/efa/src/rdm/efa_rdm_msg.c
index ab5e0fb8f63..70d7ac70d1e 100644
--- a/prov/efa/src/rdm/efa_rdm_msg.c
+++ b/prov/efa/src/rdm/efa_rdm_msg.c
@@ -18,6 +18,9 @@
#include "efa_rdm_pke_utils.h"
#include "efa_rdm_pke_req.h"
+#include "efa_mr.h"
+#include "efa_rdm_proto.h"
+#include "efa_rdm_proto_eager.h"
#include "efa_rdm_tracepoint.h"
/**
@@ -102,14 +105,42 @@ int efa_rdm_msg_select_rtm(struct efa_rdm_ep *efa_rdm_ep, struct efa_rdm_ope *tx
}
/**
- * @brief post RTM packet(s) for a send operation
+ * @brief Post an already-filled TXE using the new protocol path.
+ *
+ * Used by the retry path after handshake completes and by the normal
+ * send path. The TXE must already be filled by efa_rdm_proto_txe_fill.
+ */
+ssize_t efa_rdm_msg_post_rtm_proto(struct efa_rdm_ep *ep,
+ struct efa_rdm_ope *txe,
+ struct efa_rdm_proto *proto)
+{
+ ssize_t err;
+ uint64_t pke_send_flags = 0;
+
+ err = proto->construct_tx_pkes(
+ ep, txe->peer, NULL, txe->op, txe->tag,
+ txe->fi_flags, txe->internal_flags, txe);
+ if (err)
+ return err;
+
+ err = efa_rdm_pke_sendv(ep->send_pkt_entry_vec,
+ ep->send_pkt_entry_vec_size,
+ pke_send_flags);
+ if (err)
+ return err;
+
+ proto->handle_tx_pkes_posted(ep, txe);
+ return FI_SUCCESS;
+}
+
+/**
+ * @brief Post a RTM packet for a TX entry using the old code path.
*
* @param[in,out] ep endpoint
* @param[in,out] txe information of the send operation.
* @retval 0 if packet(s) was posted successfully.
* @retval -FI_ENOSUPP if the send operation requires an extra feature,
* which peer does not support.
- * @retval -FI_EAGAIN for temporary out of resources for send
*/
ssize_t efa_rdm_msg_post_rtm(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
{
@@ -168,6 +199,7 @@ ssize_t efa_rdm_msg_generic_send(struct efa_rdm_ep *ep, const struct fi_msg *msg
struct efa_rdm_ope *txe;
struct efa_rdm_peer *peer;
size_t available_tx_pkts;
+ struct efa_rdm_proto *proto;
efa_rdm_tracepoint(send_begin_msg_context,
(size_t) msg->context, (size_t) msg->addr);
@@ -196,6 +228,47 @@ ssize_t efa_rdm_msg_generic_send(struct efa_rdm_ep *ep, const struct fi_msg *msg
goto out;
}
+ /* First try to use the refactored code path */
+ err = efa_rdm_proto_select_send_protocol(ep, peer, msg, op, fi_flags, txe,
+ &proto);
+ if (err)
+ goto out;
+
+ /* If a protocol is found, use it. Otherwise, fall back to the old code
+ * path */
+ if (proto) {
+ efa_rdm_proto_txe_fill(txe, ep, peer, msg, op, tag, fi_flags,
+ internal_flags);
+ txe->msg_id = peer->next_msg_id++;
+
+ /*
+ * For backwards compatibility: if the peer may have zero-copy
+ * receive enabled, we must complete handshake before sending so
+ * we can discover the peer's user_recv_qp and route packets
+ * accordingly.
+ */
+ if (ep->peer_may_have_zcpy_rx &&
+ !(peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
+ err = efa_rdm_ep_enforce_handshake_for_txe(ep, txe);
+ if (err) {
+ efa_rdm_txe_release(txe);
+ peer->next_msg_id--;
+ }
+ goto out;
+ }
+
+ err = efa_rdm_msg_post_rtm_proto(ep, txe, proto);
+ if (err) {
+ efa_rdm_txe_release(txe);
+ peer->next_msg_id--;
+ goto out;
+ }
+
+ peer->flags |= EFA_RDM_PEER_REQ_SENT;
+ goto out;
+ }
+
+ /* Fallback to the old code path */
efa_rdm_txe_construct(txe, ep, peer, msg, op, fi_flags, internal_flags);
if (op == ofi_op_tagged) {
txe->cq_entry.tag = tag;
diff --git a/prov/efa/src/rdm/efa_rdm_msg.h b/prov/efa/src/rdm/efa_rdm_msg.h
index ddda076fd84..7c34d7fbc25 100644
--- a/prov/efa/src/rdm/efa_rdm_msg.h
+++ b/prov/efa/src/rdm/efa_rdm_msg.h
@@ -1,6 +1,7 @@
/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */
/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */
+struct efa_rdm_proto;
static inline
void efa_rdm_msg_construct(struct fi_msg *msg, const struct iovec *iov, void **desc,
size_t count, fi_addr_t addr, void *context, uint64_t data)
@@ -29,6 +30,32 @@ struct efa_rdm_ope *efa_rdm_msg_split_rxe(struct efa_rdm_ep *ep,
struct efa_rdm_ope *consumer_entry,
struct efa_rdm_pke *pkt_entry);
ssize_t efa_rdm_msg_post_rtm(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe);
+
+/**
+ * @brief Compute the effective fi_flags for a TX operation.
+ *
+ * Merges per-operation flags with the endpoint's tx_op_flags, respecting
+ * the tx_msg_flags setting for FI_COMPLETION.
+ *
+ * @param[in] ep RDM endpoint
+ * @param[in] fi_flags Per-operation flags from the application
+ * @return Merged flags to use for the TX operation
+ */
+static inline
+uint64_t efa_rdm_msg_get_tx_flags(struct efa_rdm_ep *ep, uint64_t fi_flags)
+{
+ uint64_t tx_op_flags;
+
+ assert(ep->base_ep.util_ep.tx_msg_flags == 0 ||
+ ep->base_ep.util_ep.tx_msg_flags == FI_COMPLETION);
+ tx_op_flags = ep->base_ep.util_ep.tx_op_flags;
+ if (ep->base_ep.util_ep.tx_msg_flags == 0)
+ tx_op_flags &= ~FI_COMPLETION;
+ return fi_flags | tx_op_flags;
+}
+
+ssize_t efa_rdm_msg_post_rtm_proto(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe,
+ struct efa_rdm_proto *proto);
/*
* The following 2 OP structures are defined in efa_rdm_msg.c and is
* used by #efa_rdm_ep_open()
diff --git a/prov/efa/src/rdm/efa_rdm_ope.c b/prov/efa/src/rdm/efa_rdm_ope.c
index 553c2a4cc1b..e719d02bd36 100644
--- a/prov/efa/src/rdm/efa_rdm_ope.c
+++ b/prov/efa/src/rdm/efa_rdm_ope.c
@@ -47,6 +47,8 @@ void efa_rdm_txe_construct(struct efa_rdm_ope *txe,
txe->efa_outstanding_tx_ops = 0;
dlist_init(&txe->queued_pkts);
+ txe->proto = NULL;
+
memcpy(txe->iov, msg->msg_iov, sizeof(struct iovec) * msg->iov_count);
memset(txe->mr, 0, sizeof(*txe->mr) * msg->iov_count);
if (msg->desc) {
@@ -1956,6 +1958,8 @@ ssize_t efa_rdm_ope_repost_ope_queued_before_handshake(struct efa_rdm_ope *ope)
switch (ope->op) {
case ofi_op_msg: /* fall through */
case ofi_op_tagged:
+ if (ope->proto)
+ return efa_rdm_msg_post_rtm_proto(ope->ep, ope, ope->proto);
return efa_rdm_msg_post_rtm(ope->ep, ope);
case ofi_op_write:
return efa_rdm_rma_post_write(ope->ep, ope);
diff --git a/prov/efa/src/rdm/efa_rdm_ope.h b/prov/efa/src/rdm/efa_rdm_ope.h
index 25c65b356ce..d37e19e2419 100644
--- a/prov/efa/src/rdm/efa_rdm_ope.h
+++ b/prov/efa/src/rdm/efa_rdm_ope.h
@@ -72,6 +72,7 @@ struct efa_rdm_ope {
struct efa_rdm_ep *ep;
struct efa_rdm_peer *peer;
+ struct efa_rdm_proto *proto; /**< protocol used by the refactored send path; NULL in the old code path */
uint32_t tx_id;
uint32_t rx_id;
diff --git a/prov/efa/src/rdm/efa_rdm_pke.c b/prov/efa/src/rdm/efa_rdm_pke.c
index e45456e2cce..136558ab6f7 100644
--- a/prov/efa/src/rdm/efa_rdm_pke.c
+++ b/prov/efa/src/rdm/efa_rdm_pke.c
@@ -109,6 +109,7 @@ struct efa_rdm_pke *efa_rdm_pke_alloc(struct efa_rdm_ep *ep,
pkt_entry->payload_size = 0;
pkt_entry->payload_mr = NULL;
pkt_entry->peer = NULL;
+ pkt_entry->handle_pke = NULL;
switch (alloc_type) {
case EFA_RDM_PKE_FROM_READ_COPY_POOL:
diff --git a/prov/efa/src/rdm/efa_rdm_pke.h b/prov/efa/src/rdm/efa_rdm_pke.h
index ec5c44624cb..40dde76d796 100644
--- a/prov/efa/src/rdm/efa_rdm_pke.h
+++ b/prov/efa/src/rdm/efa_rdm_pke.h
@@ -245,6 +245,9 @@ struct efa_rdm_pke {
/**@brief Generation counter. It is incremented every time the packet is posted to rdma-core */
uint8_t gen;
+ /**@brief Callback function called in TX and RX paths */
+ void (*handle_pke)(struct efa_rdm_pke *pkt_entry);
+
#if ENABLE_DEBUG
struct efa_rdm_pke_debug_info_buffer *debug_info; /**< Pointer to debug info buffer */
#endif
diff --git a/prov/efa/src/rdm/efa_rdm_pke_cmd.c b/prov/efa/src/rdm/efa_rdm_pke_cmd.c
index 8976df182bc..c7352f64149 100644
--- a/prov/efa/src/rdm/efa_rdm_pke_cmd.c
+++ b/prov/efa/src/rdm/efa_rdm_pke_cmd.c
@@ -91,12 +91,8 @@ int efa_rdm_pke_fill_data(struct efa_rdm_pke *pkt_entry,
ret = efa_rdm_pke_init_receipt(pkt_entry, ope);
break;
case EFA_RDM_EAGER_MSGRTM_PKT:
- assert(data_offset == 0 && data_size == -1);
- ret = efa_rdm_pke_init_eager_msgrtm(pkt_entry, ope);
- break;
case EFA_RDM_EAGER_TAGRTM_PKT:
- assert(data_offset == 0 && data_size == -1);
- ret = efa_rdm_pke_init_eager_tagrtm(pkt_entry, ope);
+ assert(0 && "Eager protocol moved to refactored code path");
break;
case EFA_RDM_MEDIUM_MSGRTM_PKT:
assert(data_offset >= 0 && data_size > 0);
@@ -173,12 +169,8 @@ int efa_rdm_pke_fill_data(struct efa_rdm_pke *pkt_entry,
ret = efa_rdm_pke_init_compare_rta(pkt_entry, ope);
break;
case EFA_RDM_DC_EAGER_MSGRTM_PKT:
- assert(data_offset == 0 && data_size == -1);
- ret = efa_rdm_pke_init_dc_eager_msgrtm(pkt_entry, ope);
- break;
case EFA_RDM_DC_EAGER_TAGRTM_PKT:
- assert(data_offset == 0 && data_size == -1);
- ret = efa_rdm_pke_init_dc_eager_tagrtm(pkt_entry, ope);
+ assert(0 && "Eager protocol moved to refactored code path");
break;
case EFA_RDM_DC_MEDIUM_MSGRTM_PKT:
assert(data_offset >= 0 && data_size > 0);
@@ -266,7 +258,7 @@ void efa_rdm_pke_handle_sent(struct efa_rdm_pke *pkt_entry, int pkt_type, struct
break;
case EFA_RDM_EAGER_MSGRTM_PKT:
case EFA_RDM_EAGER_TAGRTM_PKT:
- /* nothing to do */
+ assert(0 && "Eager protocol moved to refactored code path");
break;
case EFA_RDM_MEDIUM_MSGRTM_PKT:
case EFA_RDM_MEDIUM_TAGRTM_PKT:
@@ -310,8 +302,10 @@ void efa_rdm_pke_handle_sent(struct efa_rdm_pke *pkt_entry, int pkt_type, struct
break;
case EFA_RDM_DC_EAGER_MSGRTM_PKT:
case EFA_RDM_DC_EAGER_TAGRTM_PKT:
+ assert(0 && "Eager protocol moved to refactored code path");
+ break;
case EFA_RDM_DC_EAGER_RTW_PKT:
- /* nothing to do for DC EAGER RTM/RTW */
+ /* nothing to do for DC EAGER RTW */
break;
case EFA_RDM_CTSDATA_PKT:
efa_rdm_pke_handle_ctsdata_sent(pkt_entry);
@@ -553,13 +547,6 @@ void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
return;
}
- /* These pkts are eager pkts withour hdrs */
- if (pkt_entry->flags & EFA_RDM_PKE_SEND_TO_USER_RECV_QP) {
- efa_rdm_pke_handle_eager_rtm_send_completion(pkt_entry);
- efa_rdm_pke_release_tx(pkt_entry);
- return;
- }
-
/* Start handling pkts with hdrs */
switch (efa_rdm_pkt_type_of(pkt_entry)) {
case EFA_RDM_HANDSHAKE_PKT:
@@ -592,7 +579,7 @@ void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
break;
case EFA_RDM_EAGER_MSGRTM_PKT:
case EFA_RDM_EAGER_TAGRTM_PKT:
- efa_rdm_pke_handle_eager_rtm_send_completion(pkt_entry);
+ assert(0 && "Eager protocol moved to refactored code path");
break;
case EFA_RDM_MEDIUM_MSGRTM_PKT:
case EFA_RDM_MEDIUM_TAGRTM_PKT:
@@ -637,6 +624,8 @@ void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
break;
case EFA_RDM_DC_EAGER_MSGRTM_PKT:
case EFA_RDM_DC_EAGER_TAGRTM_PKT:
+ assert(0 && "Eager protocol moved to refactored code path");
+ break;
case EFA_RDM_DC_MEDIUM_MSGRTM_PKT:
case EFA_RDM_DC_MEDIUM_TAGRTM_PKT:
case EFA_RDM_DC_EAGER_RTW_PKT:
diff --git a/prov/efa/src/rdm/efa_rdm_pke_rtm.c b/prov/efa/src/rdm/efa_rdm_pke_rtm.c
index 5e0a7e5b3de..3b588556b6a 100644
--- a/prov/efa/src/rdm/efa_rdm_pke_rtm.c
+++ b/prov/efa/src/rdm/efa_rdm_pke_rtm.c
@@ -79,16 +79,16 @@ size_t efa_rdm_pke_get_rtm_msg_length(struct efa_rdm_pke *pkt_entry)
*
* @param[in,out] pkt_entry RTM packet entry
* @param[in] pkt_type RTM packet type
- * @param[in] txe TX entry that has user buffer information
+ * @param[in] txe TX entry that has user buffer
+ * information
* @param[in] segmment_offset data offset in respect of user buffer
- * @param[in] data_size user data size. If it is -1, the function
- * will select data size based on maximum
- * data capacity of packet entry.
+ * @param[in] data_size user data size. If it is -1, the
+ * function will select data size based on maximum data capacity of packet
+ * entry.
* @returns
* 0 on success
* negative libfabric error code for failure.
*/
-static inline
ssize_t efa_rdm_pke_init_rtm_with_payload(struct efa_rdm_pke *pkt_entry,
int pkt_type,
struct efa_rdm_ope *txe,
@@ -534,134 +534,6 @@ void efa_rdm_pke_handle_rtm_rta_recv(struct efa_rdm_pke *pkt_entry)
efa_rdm_peer_proc_pending_items_in_robuf(peer, ep);
}
-/**
- * @brief construct a eager msgrtm pkt without hdr
- *
- * @param[in,out] pkt_entry pkt to be initialized
- * @param[in] txe TX entry
- */
-static inline
-ssize_t efa_rdm_pke_init_eager_msgrtm_zero_hdr(struct efa_rdm_pke *pkt_entry,
- struct efa_rdm_ope *txe)
-{
- pkt_entry->ope = txe;
- pkt_entry->peer = txe->peer;
-
- return efa_rdm_pke_init_payload_from_ope(pkt_entry, txe,
- 0, 0, txe->total_len);
-}
-
-/**
- * @brief initialzie a EFA_RDM_EAGER_MSGRTM pacekt entry
- *
- * @param[in,out] pkt_entry EFA_RDM_EAGER_MSGRTM to be initialized
- * @param[in] txe TX entry
- */
-ssize_t efa_rdm_pke_init_eager_msgrtm(struct efa_rdm_pke *pkt_entry,
- struct efa_rdm_ope *txe)
-{
- int ret;
-
- if (pkt_entry->flags & EFA_RDM_PKE_HAS_NO_BASE_HDR)
- ret = efa_rdm_pke_init_eager_msgrtm_zero_hdr(pkt_entry, txe);
- else
- ret = efa_rdm_pke_init_rtm_with_payload(pkt_entry,
- EFA_RDM_EAGER_MSGRTM_PKT,
- txe, 0, -1);
- if (ret)
- return ret;
-
- assert(txe->total_len == pkt_entry->payload_size);
- return 0;
-}
-
-/**
- * @brief initialize a EFA_RDM_EAGER_TAGRTM packet entry
- * @param[in,out] pkt_entry EFA_RDM_EAGER_TAGRTM to be initialized
- * @param[in] txe TX entry
- */
-ssize_t efa_rdm_pke_init_eager_tagrtm(struct efa_rdm_pke *pkt_entry,
- struct efa_rdm_ope *txe)
-{
- struct efa_rdm_base_hdr *base_hdr;
- int ret;
-
- ret = efa_rdm_pke_init_rtm_with_payload(pkt_entry, EFA_RDM_EAGER_TAGRTM_PKT, txe, 0, -1);
- if (ret)
- return ret;
- assert(txe->total_len == pkt_entry->payload_size);
- base_hdr = efa_rdm_pke_get_base_hdr(pkt_entry);
- base_hdr->flags |= EFA_RDM_REQ_TAGGED;
- efa_rdm_pke_set_rtm_tag(pkt_entry, txe->tag);
- return 0;
-}
-
-/**
- * @brief initialzie a EFA_RDM_DC_EAGER_MSGRTM pacekt entry
- *
- * @param[in,out] pkt_entry EFA_RDM_DC_EAGER_MSGRTM to be initialized
- * @param[in] txe TX entry
- */
-ssize_t efa_rdm_pke_init_dc_eager_msgrtm(struct efa_rdm_pke *pkt_entry,
- struct efa_rdm_ope *txe)
-
-{
- struct efa_rdm_dc_eager_msgrtm_hdr *dc_eager_msgrtm_hdr;
- int ret;
-
- txe->internal_flags |= EFA_RDM_TXE_DELIVERY_COMPLETE_REQUESTED;
- ret = efa_rdm_pke_init_rtm_with_payload(pkt_entry, EFA_RDM_DC_EAGER_MSGRTM_PKT, txe, 0, -1);
- if (ret)
- return ret;
- dc_eager_msgrtm_hdr = efa_rdm_pke_get_dc_eager_msgrtm_hdr(pkt_entry);
- dc_eager_msgrtm_hdr->hdr.send_id = txe->tx_id;
- return 0;
-}
-
-/**
- * @brief initialize a EFA_RDM_DC_EAGER_TAGRTM pacekt entry
- *
- * @param[in,out] pkt_entry EFA_RDM_DC_EAGER_TAGRTM to be initialized
- * @param[in] txe TX entry
- */
-ssize_t efa_rdm_pke_init_dc_eager_tagrtm(struct efa_rdm_pke *pkt_entry,
- struct efa_rdm_ope *txe)
-{
- struct efa_rdm_base_hdr *base_hdr;
- struct efa_rdm_dc_eager_tagrtm_hdr *dc_eager_tagrtm_hdr;
- int ret;
-
- txe->internal_flags |= EFA_RDM_TXE_DELIVERY_COMPLETE_REQUESTED;
- ret = efa_rdm_pke_init_rtm_with_payload(pkt_entry, EFA_RDM_DC_EAGER_TAGRTM_PKT, txe, 0, -1);
- if (ret)
- return ret;
- base_hdr = efa_rdm_pke_get_base_hdr(pkt_entry);
- base_hdr->flags |= EFA_RDM_REQ_TAGGED;
- efa_rdm_pke_set_rtm_tag(pkt_entry, txe->tag);
-
- dc_eager_tagrtm_hdr = efa_rdm_pke_get_dc_eager_tagrtm_hdr(pkt_entry);
- dc_eager_tagrtm_hdr->hdr.send_id = txe->tx_id;
- return 0;
-}
-
-/**
- * @brief handle the event that an EAGER RTM has send completed
- *
- * @details
- * This function applies to EAGER_MSGRTM and EAGER_TAGRTM, it
- * does not apply to DC_EAGER_MSGRTM and DC_EAGER_TAGRTM
- *
- * @param[in,out] pkt_entry EAGER_MSGRTM or EAGER_TAGRTM packet entry
- */
-void efa_rdm_pke_handle_eager_rtm_send_completion(struct efa_rdm_pke *pkt_entry)
-{
- struct efa_rdm_ope *txe;
-
- txe = pkt_entry->ope;
- assert(txe->total_len == pkt_entry->payload_size);
- efa_rdm_ope_handle_send_completed(txe);
-}
-
/**
* @brief process a matched eager rtm packet entry
*
diff --git a/prov/efa/src/rdm/efa_rdm_pke_rtm.h b/prov/efa/src/rdm/efa_rdm_pke_rtm.h
index ced0ee4bce1..41a7f79affd 100644
--- a/prov/efa/src/rdm/efa_rdm_pke_rtm.h
+++ b/prov/efa/src/rdm/efa_rdm_pke_rtm.h
@@ -104,18 +104,6 @@ struct efa_rdm_dc_eager_rtm_base_hdr *efa_rdm_pke_get_dc_eager_rtm_base_hdr(stru
return (struct efa_rdm_dc_eager_rtm_base_hdr *)pke->wiredata;
}
-static inline
-struct efa_rdm_dc_eager_msgrtm_hdr *efa_rdm_pke_get_dc_eager_msgrtm_hdr(struct efa_rdm_pke *pke)
-{
- return (struct efa_rdm_dc_eager_msgrtm_hdr *)pke->wiredata;
-}
-
-static inline
-struct efa_rdm_dc_eager_tagrtm_hdr *efa_rdm_pke_get_dc_eager_tagrtm_hdr(struct efa_rdm_pke *pke)
-{
- return (struct efa_rdm_dc_eager_tagrtm_hdr *)pke->wiredata;
-}
-
static inline
struct efa_rdm_medium_rtm_base_hdr *efa_rdm_pke_get_medium_rtm_base_hdr(struct efa_rdm_pke *pke)
{
@@ -158,20 +146,6 @@ struct efa_rdm_runtread_rtm_base_hdr *efa_rdm_pke_get_runtread_rtm_base_hdr(stru
return (struct efa_rdm_runtread_rtm_base_hdr *)pke->wiredata;
}
-ssize_t efa_rdm_pke_init_eager_msgrtm(struct efa_rdm_pke *pkt_entry,
- struct efa_rdm_ope *txe);
-
-ssize_t efa_rdm_pke_init_eager_tagrtm(struct efa_rdm_pke *pkt_entry,
- struct efa_rdm_ope *txe);
-
-ssize_t efa_rdm_pke_init_dc_eager_msgrtm(struct efa_rdm_pke *pkt_entry,
- struct efa_rdm_ope *txe);
-
-ssize_t efa_rdm_pke_init_dc_eager_tagrtm(struct efa_rdm_pke *pkt_entry,
- struct efa_rdm_ope *txe);
-
-void efa_rdm_pke_handle_eager_rtm_send_completion(struct efa_rdm_pke *pkt_entry);
-
ssize_t efa_rdm_pke_proc_matched_eager_rtm(struct efa_rdm_pke *pkt_entry);
ssize_t efa_rdm_pke_init_medium_msgrtm(struct efa_rdm_pke *pkt_entry,
@@ -240,4 +214,8 @@ void efa_rdm_pke_handle_runtread_rtm_sent(struct efa_rdm_pke *pkt_entry, struct
void efa_rdm_pke_handle_runtread_rtm_send_completion(struct efa_rdm_pke *pkt_entry);
+ssize_t efa_rdm_pke_init_rtm_with_payload(struct efa_rdm_pke *pkt_entry,
+ int pkt_type, struct efa_rdm_ope *txe,
+ size_t segment_offset, int data_size);
+
#endif
\ No newline at end of file
diff --git a/prov/efa/src/rdm/efa_rdm_proto.c b/prov/efa/src/rdm/efa_rdm_proto.c
new file mode 100644
index 00000000000..9077c2eca44
--- /dev/null
+++ b/prov/efa/src/rdm/efa_rdm_proto.c
@@ -0,0 +1,198 @@
+/* Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */
+/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */
+
+#include "efa_rdm_proto.h"
+#include "efa.h"
+#include "efa_rdm_proto_eager.h"
+#include "efa_rdm_msg.h"
+
+/* List of supported protocols.
+ * The protocols listed here will be tried in the order they're listed.
+ * The first protocol that can be used for the TX operation will be used.
+ */
+struct efa_rdm_proto *efa_rdm_protocols[] = {
+ &efa_rdm_proto_eager,
+};
+
+int efa_rdm_proto_select_send_protocol(struct efa_rdm_ep *ep,
+ struct efa_rdm_peer *peer,
+ const struct fi_msg *msg, uint32_t op,
+ uint64_t flags, struct efa_rdm_ope *txe,
+ struct efa_rdm_proto **proto)
+{
+ /* TODO: Handle memory registration of user buffers.
+ * If MR fails, switch to a different protocol.
+ */
+
+ struct efa_rdm_proto *selected_proto;
+ int req_pkt_type, iface;
+ uint16_t header_flags = 0;
+ bool tagged, delivery_complete_requested;
+ uint64_t effective_flags;
+
+ effective_flags = efa_rdm_msg_get_tx_flags(ep, flags);
+
+ if (effective_flags & FI_INJECT ||
+ efa_rdm_peer_expects_zero_hdr_data_transfer(peer))
+ delivery_complete_requested = false;
+ else
+ delivery_complete_requested = effective_flags & FI_DELIVERY_COMPLETE;
+
+ tagged = (op == ofi_op_tagged);
+
+ txe->ep = ep;
+ txe->iov_count = msg->iov_count;
+ memcpy(txe->iov, msg->msg_iov, sizeof(struct iovec) * msg->iov_count);
+ memset(txe->mr, 0, sizeof(*txe->mr) * msg->iov_count);
+ if (msg->desc)
+ memcpy(txe->desc, msg->desc, sizeof(*msg->desc) * msg->iov_count);
+ else
+ memset(txe->desc, 0, sizeof(*txe->desc) * msg->iov_count);
+ txe->total_len = ofi_total_iov_len(msg->msg_iov, msg->iov_count);
+
+ iface = (msg->desc && msg->desc[0]) ?
+ ((struct efa_mr *) msg->desc[0])->iface :
+ FI_HMEM_SYSTEM;
+
+ /* Logic copied from efa_rdm_txe_max_req_data_capacity */
+ if (efa_rdm_peer_need_raw_addr_hdr(peer))
+ header_flags |= EFA_RDM_REQ_OPT_RAW_ADDR_HDR;
+ else if (efa_rdm_peer_need_connid(peer))
+ header_flags |= EFA_RDM_PKT_CONNID_HDR;
+
+ if (flags & FI_REMOTE_CQ_DATA)
+ header_flags |= EFA_RDM_REQ_OPT_CQ_DATA_HDR;
+
+ for (int i = 0; i < ARRAY_SIZE(efa_rdm_protocols); ++i) {
+ selected_proto = efa_rdm_protocols[i];
+
+ /*
+ * For performance consideration, this function assume the
+ * tagged rtm packet type id is always the correspondent message
+ * rtm packet type id + 1, thus the assertion here.
+ */
+ assert(selected_proto->req_pkt_type_tagged ==
+ selected_proto->req_pkt_type + 1);
+ assert(selected_proto->req_pkt_type_tagged_dc ==
+ selected_proto->req_pkt_type_dc + 1);
+
+ /* TODO: The req_pkt_type is again needed in each protocol when
+ * allocating pkes Option 1: Make pkt headers independent of tag
+ * and DC to avoid these checks Option 2: Store the req_pkt_type
+ * in the txe
+ */
+ req_pkt_type =
+ delivery_complete_requested ?
+ selected_proto->req_pkt_type_dc + tagged :
+ selected_proto->req_pkt_type + tagged;
+
+ /* All protocols other than the eager protocol can benefit from
+ * registering the application buffers.
+ * TODO: Move function to efa_rdm_proto.c
+ */
+ if (selected_proto->can_use_protocol_for_send(
+ txe, req_pkt_type, header_flags, iface)) {
+ *proto = selected_proto;
+ txe->proto = selected_proto;
+ return FI_SUCCESS;
+ }
+ }
+
+ /*
+ * Zero-copy (headerless) sends must fit in a single eager packet.
+ * If we reach here with a zero-copy peer, no protocol was selected,
+ * which means the message is too large for eager — that's a bug.
+ */
+ assert(!efa_rdm_peer_expects_zero_hdr_data_transfer(peer));
+
+ *proto = NULL;
+ txe->proto = NULL;
+ return FI_SUCCESS;
+}
+
+/* Utility funcions */
+
+void efa_rdm_proto_txe_fill(struct efa_rdm_ope *txe, struct efa_rdm_ep *ep,
+ struct efa_rdm_peer *peer, const struct fi_msg *msg,
+ uint32_t op, uint64_t tag, uint64_t flags,
+ uint32_t internal_flags)
+{
+ /* Logic copied from efa_rdm_txe_construct */
+ uint64_t tx_op_flags;
+
+ /* txe->mr, txe->desc, txe->total_len will be filled by
+ * efa_rdm_ope_try_fill_desc in efa_rdm_proto_select_send_protocol
+ */
+
+ txe->type = EFA_RDM_TXE;
+ txe->op = op;
+ txe->tx_id = ofi_buf_index(txe);
+ txe->state = EFA_RDM_TXE_REQ;
+ txe->peer = peer;
+
+ /* peer would be NULL for local read operation */
+ if (txe->peer) {
+ dlist_insert_tail(&txe->peer_entry, &txe->peer->txe_list);
+ }
+
+ txe->internal_flags = internal_flags;
+ txe->bytes_received = 0;
+ txe->bytes_copied = 0;
+ txe->bytes_acked = 0;
+ txe->bytes_sent = 0;
+ txe->window = 0;
+ txe->iov_count = msg->iov_count;
+ txe->rma_iov_count = 0;
+ txe->msg_id = 0;
+ txe->efa_outstanding_tx_ops = 0;
+ dlist_init(&txe->queued_pkts);
+
+ memcpy(txe->iov, msg->msg_iov, sizeof(struct iovec) * msg->iov_count);
+
+ /* cq_entry on completion */
+ txe->cq_entry.op_context = msg->context;
+ txe->cq_entry.data = msg->data;
+ txe->cq_entry.len = ofi_total_iov_len(txe->iov, txe->iov_count);
+ txe->total_len = txe->cq_entry.len;
+ txe->cq_entry.buf =
+ OFI_LIKELY(txe->cq_entry.len > 0) ? txe->iov[0].iov_base : NULL;
+
+ /* set flags */
+ assert(ep->base_ep.util_ep.tx_msg_flags == 0 ||
+ ep->base_ep.util_ep.tx_msg_flags == FI_COMPLETION);
+ tx_op_flags = ep->base_ep.util_ep.tx_op_flags;
+ if (ep->base_ep.util_ep.tx_msg_flags == 0)
+ tx_op_flags &= ~FI_COMPLETION;
+ txe->fi_flags = flags | tx_op_flags;
+ txe->bytes_runt = 0;
+ dlist_init(&txe->entry);
+
+ switch (op) {
+ case ofi_op_tagged:
+ txe->cq_entry.flags = FI_TRANSMIT | FI_MSG | FI_TAGGED;
+ txe->cq_entry.tag = tag;
+ txe->tag = tag;
+ break;
+ case ofi_op_write:
+ txe->cq_entry.flags = FI_RMA | FI_WRITE;
+ break;
+ case ofi_op_read_req:
+ txe->cq_entry.flags = FI_RMA | FI_READ;
+ break;
+ case ofi_op_msg:
+ txe->cq_entry.flags = FI_TRANSMIT | FI_MSG;
+ break;
+ case ofi_op_atomic:
+ txe->cq_entry.flags = (FI_WRITE | FI_ATOMIC);
+ break;
+ case ofi_op_atomic_fetch:
+ case ofi_op_atomic_compare:
+ txe->cq_entry.flags = (FI_READ | FI_ATOMIC);
+ break;
+ default:
+ EFA_WARN(FI_LOG_CQ, "invalid operation type\n");
+ assert(0);
+ }
+
+ dlist_insert_tail(&txe->ep_entry, &ep->txe_list);
+}
diff --git a/prov/efa/src/rdm/efa_rdm_proto.h b/prov/efa/src/rdm/efa_rdm_proto.h
new file mode 100644
index 00000000000..d11e914890c
--- /dev/null
+++ b/prov/efa/src/rdm/efa_rdm_proto.h
@@ -0,0 +1,123 @@
+/* Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */
+/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */
+
+#ifndef _EFA_RDM_PROTO_H
+#define _EFA_RDM_PROTO_H
+
+#include "efa.h"
+#include "efa_rdm_pkt_type.h"
+
+/**
+ * @brief Interface for EFA RDM protocols.
+ *
+ * Each protocol (eager, medium, long CTS, long read, runt read)
+ * implements this interface to define how it handles TX and RX operations.
+ *
+ * The TX send path works as follows:
+ *
+ * 1. efa_rdm_proto_select_send_protocol() iterates through the protocol
+ * registry (efa_rdm_protocols[]) in priority order, calling
+ * can_use_protocol_for_send() on each. The first match is selected.
+ *
+ * 2. The selected protocol's construct_tx_pkes() builds the packet
+ * entries and stores them in ep->send_pkt_entry_vec.
+ *
+ * 3. efa_rdm_msg_generic_send posts all packet entries via efa_rdm_pke_sendv().
+ *
+ * 4. handle_tx_pkes_posted() is called for post-send bookkeeping.
+ *
+ * 5. When the device completes a send, the CQ handler invokes the
+ * per-packet callback (set in step 2) which handles completion
+ * logic, CQ reporting, and TXE/PKE release.
+ *
+ * Protocol priority is determined by position in efa_rdm_protocols[].
+ * Protocols are ordered from most restrictive (eager) to least
+ * restrictive (long CTS) so the most efficient protocol is always
+ * selected.
+ */
+struct efa_rdm_proto {
+ /* TX path handlers */
+
+ /* This function determines whether the protocol can be used for a given
+ * send operation.
+ */
+ bool (*can_use_protocol_for_send)(struct efa_rdm_ope *txe,
+ int req_pkt_type,
+ uint16_t header_flags, int iface);
+
+ /* This function will allocate the pkes that need to be sent for a given
+ * TX operation. At the end of this function, ep->send_pkt_entry_vec
+ * will be correctly populated with the all of the pkes that need to be
+ * sent including copying the application data into the pke buffer if
+ * necessary. Each pke will have an appropriate callback function set to
+ * handle the TX completion of that pke. This function also constructs
+ * and returns the txe
+ */
+ int (*construct_tx_pkes)(struct efa_rdm_ep *ep,
+ struct efa_rdm_peer *peer,
+ const struct fi_msg *msg, uint32_t op,
+ uint64_t tag, uint64_t flags,
+ uint32_t internal_flags,
+ struct efa_rdm_ope *txe);
+
+ /* This function is called after all pkes are posted to the EFA device.
+ * It is useful for some protocols: e.g. to register the buffer after
+ * posting a Long CTS RTM pke or to update the number of in flight reads
+ * and read bytes
+ */
+ void (*handle_tx_pkes_posted)(struct efa_rdm_ep *ep,
+ struct efa_rdm_ope *txe);
+
+ /* TX utitlities */
+ int req_pkt_type;
+ int req_pkt_type_tagged;
+ int req_pkt_type_dc;
+ int req_pkt_type_tagged_dc;
+};
+
+/**
+ * @brief Select the appropriate send protocol for a TX operation.
+ *
+ * Iterates through registered protocols in priority order and selects
+ * the first one whose can_use_protocol_for_send() returns true.
+ *
+ * It will also handle memory registration of user buffers. If read based
+ * protocols are appropriate but MR fails, it will automatically switch to a
+ * different protocol.
+ *
+ * @param[in] ep Endpoint
+ * @param[in] peer Peer to send to
+ * @param[in] msg Message descriptor from application
+ * @param[in] op Operation type (ofi_op_msg or ofi_op_tagged)
+ * @param[in] flags Operation flags (FI_INJECT, FI_DELIVERY_COMPLETE, etc.)
+ * @param[out] txe Pre-allocated TXE, partially initialized on return
+ * @param[out] proto Selected protocol, or NULL if none matched
+ * @return 0 on success, negative errno on failure
+ */
+int efa_rdm_proto_select_send_protocol(struct efa_rdm_ep *ep,
+ struct efa_rdm_peer *peer,
+ const struct fi_msg *msg, uint32_t op,
+ uint64_t flags, struct efa_rdm_ope *txe,
+ struct efa_rdm_proto **proto);
+
+/* Utility funcions */
+static inline void
+efa_rdm_proto_handle_tx_pkes_posted_no_op(struct efa_rdm_ep *ep,
+ struct efa_rdm_ope *txe)
+{
+ return;
+};
+
+/**
+ * @brief Initialize a TXE for use by the new protocol interface.
+ *
+ * Similar to efa_rdm_txe_construct but does not set mr, desc, or
+ * total_len since those are already populated by
+ * efa_rdm_proto_select_send_protocol.
+ */
+void efa_rdm_proto_txe_fill(struct efa_rdm_ope *txe, struct efa_rdm_ep *ep,
+ struct efa_rdm_peer *peer, const struct fi_msg *msg,
+ uint32_t op, uint64_t tag, uint64_t flags,
+ uint32_t internal_flags);
+
+#endif /* _EFA_RDM_PROTO_H */
diff --git a/prov/efa/src/rdm/efa_rdm_proto_eager.c b/prov/efa/src/rdm/efa_rdm_proto_eager.c
new file mode 100644
index 00000000000..15806f46e14
--- /dev/null
+++ b/prov/efa/src/rdm/efa_rdm_proto_eager.c
@@ -0,0 +1,222 @@
+/* Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */
+/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */
+
+#include "efa_rdm_proto_eager.h"
+#include "efa.h"
+#include "efa_rdm_pke_req.h"
+#include "efa_rdm_pke_utils.h"
+#include "efa_rdm_pkt_type.h"
+
+/*
+ * List of packet types used by this protocol
+ *
+ * For send/recv operations
+ * EFA_RDM_EAGER_MSGRTM_PKT
+ * EFA_RDM_EAGER_TAGRTM_PKT
+ * EFA_RDM_DC_EAGER_MSGRTM_PKT
+ * EFA_RDM_DC_EAGER_TAGRTM_PKT
+ *
+ * For FI_DELIVERY_COMPLETE - shared with other protocols
+ * EFA_RDM_RECEIPT_PKT
+ */
+
+/*
+ * Description of the protocol
+ * https://github.com/ofiwg/libfabric/blob/main/prov/efa/docs/efa_rdm_protocol_v4.md#eager-message-featuresubprotocol
+ */
+
+/**
+ * @brief Check if the eager protocol can handle this send operation.
+ *
+ * Returns true if the message fits in a single MTU-sized packet after
+ * accounting for the request header size.
+ */
+static bool efa_rdm_proto_eager_can_use_for_send(struct efa_rdm_ope *txe,
+ int req_pkt_type,
+ uint16_t header_flags,
+ int iface)
+{
+ size_t max_data_offset, max_rtm_data_capacity;
+
+ /* TODO: For emulated read and atomics, need to consider RMA
+ * IOVs in the header
+ * https://github.com/ofiwg/libfabric/blob/cff899c9ef6dd823a1e3b35d3205622013c6eb6c/prov/efa/src/rdm/efa_rdm_pkt_type.c#L101-L103
+ */
+ max_data_offset = efa_rdm_pkt_type_get_req_hdr_size(req_pkt_type,
+ header_flags, 0);
+ max_rtm_data_capacity = txe->ep->mtu_size - max_data_offset;
+
+ return txe->total_len <= max_rtm_data_capacity;
+}
+
+struct efa_rdm_proto efa_rdm_proto_eager = {
+ .can_use_protocol_for_send = &efa_rdm_proto_eager_can_use_for_send,
+ .construct_tx_pkes = &efa_rdm_proto_eager_construct_tx_pkes,
+ .req_pkt_type = EFA_RDM_EAGER_MSGRTM_PKT,
+ .req_pkt_type_dc = EFA_RDM_DC_EAGER_MSGRTM_PKT,
+ .req_pkt_type_tagged = EFA_RDM_EAGER_TAGRTM_PKT,
+ .req_pkt_type_tagged_dc = EFA_RDM_DC_EAGER_TAGRTM_PKT,
+ .handle_tx_pkes_posted = &efa_rdm_proto_handle_tx_pkes_posted_no_op,
+};
+
+/* TX path callbacks - one callback for each packet type that this protocol uses
+ */
+/**
+ * @brief Handle send completion for a non-DC eager RTM packet.
+ *
+ * Reports the CQ completion, releases the TXE, and releases the
+ * TX packet entry.
+ */
+void efa_rdm_proto_eager_handle_rtm_send_completion(
+ struct efa_rdm_pke *pkt_entry)
+{
+ struct efa_rdm_ope *txe;
+
+ txe = pkt_entry->ope;
+ assert(txe);
+ assert(txe->total_len == pkt_entry->payload_size);
+
+ efa_rdm_ope_handle_send_completed(txe);
+
+ efa_rdm_pke_release_tx(pkt_entry);
+}
+
+/**
+ * @brief Handle send completion for a DC eager RTM packet.
+ *
+ * Only releases the TXE when both all send completions have arrived
+ * (efa_outstanding_tx_ops == 0) and the receipt packet has been received.
+ */
+void efa_rdm_proto_eager_handle_rtm_dc_send_completion(
+ struct efa_rdm_pke *pkt_entry)
+{
+ struct efa_rdm_ope *txe;
+
+ txe = pkt_entry->ope;
+ assert(txe);
+ assert(txe->total_len == pkt_entry->payload_size);
+
+ if (efa_rdm_txe_dc_ready_for_release(txe))
+ efa_rdm_txe_release(txe);
+
+ efa_rdm_pke_release_tx(pkt_entry);
+}
+
+/**
+ * @brief Construct TX packet entries for the eager protocol.
+ *
+ * Allocates a single TX packet entry, initializes the RTM header with
+ * the message payload, and sets the per-packet send completion callback.
+ * Supports both regular and delivery-complete (DC) eager packets.
+ *
+ * On success, ep->send_pkt_entry_vec[0] contains the packet entry and
+ * ep->send_pkt_entry_vec_size is set to 1.
+ *
+ * @return 0 on success, negative errno on failure
+ */
+int efa_rdm_proto_eager_construct_tx_pkes(struct efa_rdm_ep *ep,
+ struct efa_rdm_peer *peer,
+ const struct fi_msg *msg, uint32_t op,
+ uint64_t tag, uint64_t flags,
+ uint32_t internal_flags,
+ struct efa_rdm_ope *txe)
+{
+ int ret, req_pkt_type, pkt_entry_cnt;
+ bool tagged, delivery_complete_requested;
+ struct efa_rdm_pke *pkt_entry = NULL;
+ struct efa_rdm_rtm_base_hdr *rtm_hdr;
+ struct efa_rdm_dc_eager_rtm_base_hdr *dc_base_hdr;
+
+ // Eager protocol sends 1 packet by definition
+ pkt_entry_cnt = 1;
+
+ // Verify that the send queue is not full
+ assert(ep->efa_max_outstanding_tx_ops - ep->efa_outstanding_tx_ops -
+ ep->efa_rnr_queued_pkt_cnt >
+ 0);
+
+ tagged = (op == ofi_op_tagged);
+
+ if (flags & FI_INJECT ||
+ efa_rdm_peer_expects_zero_hdr_data_transfer(peer))
+ delivery_complete_requested = false;
+ else
+ delivery_complete_requested = flags & FI_DELIVERY_COMPLETE;
+
+ req_pkt_type = delivery_complete_requested ?
+ efa_rdm_proto_eager.req_pkt_type_dc + tagged :
+ efa_rdm_proto_eager.req_pkt_type + tagged;
+
+ pkt_entry = efa_rdm_pke_alloc(ep, ep->efa_tx_pkt_pool,
+ EFA_RDM_PKE_FROM_EFA_TX_POOL);
+
+ pkt_entry->ope = txe;
+ pkt_entry->peer = peer;
+ pkt_entry->handle_pke = &efa_rdm_proto_eager_handle_rtm_send_completion;
+
+ if (efa_rdm_peer_expects_zero_hdr_data_transfer(peer)) {
+ /* Zero-copy (headerless) path: send raw user data directly
+ * to the peer's dedicated user_recv_qp without any protocol
+ * headers. Requires completed handshake so we know the peer's
+ * QP info. */
+ assert(peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED);
+ pkt_entry->flags |= EFA_RDM_PKE_SEND_TO_USER_RECV_QP |
+ EFA_RDM_PKE_HAS_NO_BASE_HDR;
+
+ ret = efa_rdm_pke_init_payload_from_ope(
+ pkt_entry, txe, 0, 0, txe->total_len);
+ if (ret)
+ goto out;
+ } else {
+ efa_rdm_pke_init_req_hdr_common(pkt_entry, req_pkt_type, txe);
+
+ rtm_hdr = (struct efa_rdm_rtm_base_hdr *) pkt_entry->wiredata;
+ rtm_hdr->flags |= EFA_RDM_REQ_MSG;
+ rtm_hdr->msg_id = txe->msg_id;
+
+ if (tagged) {
+ rtm_hdr->flags |= EFA_RDM_REQ_TAGGED;
+ efa_rdm_pke_set_rtm_tag(pkt_entry, txe->tag);
+ }
+
+ EFA_DBG(FI_LOG_EP_DATA,
+ "eager protocol: dc_requested=%d tagged=%d req_pkt_type=%d\n",
+ delivery_complete_requested, tagged, req_pkt_type);
+
+ if (delivery_complete_requested) {
+ txe->internal_flags |= EFA_RDM_TXE_DELIVERY_COMPLETE_REQUESTED;
+ dc_base_hdr = (struct efa_rdm_dc_eager_rtm_base_hdr *)
+ pkt_entry->wiredata;
+ dc_base_hdr->send_id = txe->tx_id;
+ pkt_entry->handle_pke =
+ &efa_rdm_proto_eager_handle_rtm_dc_send_completion;
+ }
+
+ ret = efa_rdm_pke_init_payload_from_ope(
+ pkt_entry, txe, efa_rdm_pke_get_req_hdr_size(pkt_entry), 0,
+ txe->total_len);
+
+ if (ret)
+ goto out;
+ }
+
+ // Verify that all of the data has been copied to the pke buffer
+ assert(txe->total_len == pkt_entry->payload_size);
+
+ ep->send_pkt_entry_vec[0] = pkt_entry;
+ ep->send_pkt_entry_vec_size = pkt_entry_cnt;
+ EFA_INFO(FI_LOG_EP_DATA,
+ "eager protocol: posting 1 pke, size %lu, msg_id %" PRIu32 "\n",
+ txe->total_len, txe->msg_id);
+
+ return FI_SUCCESS;
+
+out:
+ if (txe) {
+ peer->next_msg_id--;
+ efa_rdm_txe_release(txe);
+ }
+ if (pkt_entry)
+ efa_rdm_pke_release_tx(pkt_entry);
+ return ret;
+}
diff --git a/prov/efa/src/rdm/efa_rdm_proto_eager.h b/prov/efa/src/rdm/efa_rdm_proto_eager.h
new file mode 100644
index 00000000000..19bd43438e4
--- /dev/null
+++ b/prov/efa/src/rdm/efa_rdm_proto_eager.h
@@ -0,0 +1,24 @@
+/* Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */
+/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */
+
+#ifndef _EFA_RDM_PROTO_EAGER_H
+#define _EFA_RDM_PROTO_EAGER_H
+
+#include "efa_rdm_proto.h"
+
+extern struct efa_rdm_proto efa_rdm_proto_eager;
+
+int efa_rdm_proto_eager_construct_tx_pkes(struct efa_rdm_ep *ep,
+ struct efa_rdm_peer *peer,
+ const struct fi_msg *msg, uint32_t op,
+ uint64_t tag, uint64_t flags,
+ uint32_t internal_flags,
+ struct efa_rdm_ope *txe);
+
+void efa_rdm_proto_eager_handle_rtm_send_completion(
+ struct efa_rdm_pke *pkt_entry);
+
+void efa_rdm_proto_eager_handle_rtm_dc_send_completion(
+ struct efa_rdm_pke *pkt_entry);
+
+#endif /* _EFA_RDM_PROTO_EAGER_H */
diff --git a/prov/efa/test/efa_unit_test_msg.c b/prov/efa/test/efa_unit_test_msg.c
index a0a30239224..5494c5d5188 100644
--- a/prov/efa/test/efa_unit_test_msg.c
+++ b/prov/efa/test/efa_unit_test_msg.c
@@ -4,6 +4,9 @@
#include "efa_unit_tests.h"
#include "ofi_util.h"
+#include "efa_rdm_msg.h"
+#include "efa_rdm_pke_utils.h"
+#include "efa_rdm_protocol.h"
static void test_efa_msg_recv_prep(struct efa_resource *resource,
@@ -501,3 +504,171 @@ void test_efa_msg_inject_with_large_msg_fails(struct efa_resource **state)
efa_unit_test_buff_destruct(&send_buff);
}
+
+/**
+ * @brief Test efa_rdm_msg_get_tx_flags correctly merges per-op flags with
+ * endpoint tx_op_flags and respects tx_msg_flags for FI_COMPLETION.
+ */
+void test_efa_rdm_msg_get_tx_flags(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_rdm_ep *ep;
+ uint64_t result;
+
+ efa_unit_test_resource_construct_rdm_shm_disabled(resource);
+ ep = container_of(resource->ep, struct efa_rdm_ep,
+ base_ep.util_ep.ep_fid);
+
+ /* DC with completion: both DC and COMPLETION should be present */
+ ep->base_ep.util_ep.tx_op_flags = FI_COMPLETION | FI_DELIVERY_COMPLETE;
+ ep->base_ep.util_ep.tx_msg_flags = FI_COMPLETION;
+ result = efa_rdm_msg_get_tx_flags(ep, 0);
+ assert_true(result & FI_DELIVERY_COMPLETE);
+ assert_true(result & FI_COMPLETION);
+
+ /* Selective completion (tx_msg_flags=0): DC present, COMPLETION stripped */
+ ep->base_ep.util_ep.tx_msg_flags = 0;
+ result = efa_rdm_msg_get_tx_flags(ep, 0);
+ assert_true(result & FI_DELIVERY_COMPLETE);
+ assert_false(result & FI_COMPLETION);
+
+ /* Per-op flags merged with ep flags */
+ ep->base_ep.util_ep.tx_op_flags = FI_DELIVERY_COMPLETE;
+ ep->base_ep.util_ep.tx_msg_flags = FI_COMPLETION;
+ result = efa_rdm_msg_get_tx_flags(ep, FI_INJECT);
+ assert_true(result & FI_INJECT);
+ assert_true(result & FI_DELIVERY_COMPLETE);
+}
+
+/**
+ * @brief Test that sending with FI_DELIVERY_COMPLETE in tx_op_flags produces
+ * a DC eager packet type.
+ */
+void test_efa_rdm_msg_send_dc_eager_pkt_type(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_unit_test_buff send_buff;
+ struct efa_rdm_ep *ep;
+ struct efa_rdm_peer *peer;
+ struct efa_rdm_pke *pkt_entry;
+ struct efa_rdm_base_hdr *base_hdr;
+ struct efa_ep_addr raw_addr = {0};
+ size_t raw_addr_len = sizeof(raw_addr);
+ fi_addr_t addr;
+ int ret;
+
+ efa_unit_test_resource_construct_rdm_shm_disabled(resource);
+ efa_unit_test_buff_construct(&send_buff, resource, 64);
+
+ ep = container_of(resource->ep, struct efa_rdm_ep,
+ base_ep.util_ep.ep_fid);
+
+ assert_int_equal(
+ fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0);
+ raw_addr.qpn = 1;
+ raw_addr.qkey = 0x1234;
+ assert_int_equal(
+ fi_av_insert(resource->av, &raw_addr, 1, &addr, 0, NULL), 1);
+
+ ep->base_ep.util_ep.tx_op_flags = FI_COMPLETION | FI_DELIVERY_COMPLETE;
+ ep->base_ep.util_ep.tx_msg_flags = FI_COMPLETION;
+
+ /* Mark peer as handshake received so DC send doesn't queue */
+ peer = efa_rdm_ep_get_peer(ep, addr);
+ peer->flags |= EFA_RDM_PEER_HANDSHAKE_RECEIVED;
+ peer->extra_info[0] |= EFA_RDM_EXTRA_FEATURE_DELIVERY_COMPLETE;
+
+ g_efa_unit_test_mocks.efa_qp_post_send = &efa_mock_efa_qp_post_send_return_mock;
+ will_return(efa_mock_efa_qp_post_send_return_mock, 0);
+
+ ret = fi_send(resource->ep, send_buff.buff, send_buff.size,
+ fi_mr_desc(send_buff.mr), addr, NULL);
+ assert_int_equal(ret, 0);
+ assert_int_equal(g_ibv_submitted_wr_id_cnt, 1);
+
+ pkt_entry = efa_rdm_cq_get_pke_from_wr_id_solicited(
+ (uint64_t)g_ibv_submitted_wr_id_vec[0]);
+ base_hdr = efa_rdm_pke_get_base_hdr(pkt_entry);
+ assert_int_equal(base_hdr->type, EFA_RDM_DC_EAGER_MSGRTM_PKT);
+ assert_true(pkt_entry->ope->fi_flags & FI_DELIVERY_COMPLETE);
+
+ efa_rdm_txe_release(pkt_entry->ope);
+ efa_rdm_pke_release_tx(pkt_entry);
+ efa_unit_test_buff_destruct(&send_buff);
+}
+
+/**
+ * @brief Test that binding CQ with FI_SELECTIVE_COMPLETION sets tx_msg_flags=0
+ * and that a send without FI_COMPLETION flag does not set FI_COMPLETION in the
+ * ope
+ */
+void test_efa_rdm_msg_send_selective_completion(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_unit_test_buff send_buff;
+ struct efa_rdm_ep *ep;
+ struct efa_rdm_pke *pkt_entry;
+ struct efa_ep_addr raw_addr;
+ size_t raw_addr_len = sizeof(raw_addr);
+ fi_addr_t addr;
+ struct fi_cq_attr cq_attr = { .format = FI_CQ_FORMAT_DATA };
+ int ret;
+
+ /* Construct resource without CQ and EP not enabled */
+ efa_unit_test_resource_construct_no_cq_and_ep_not_enabled(
+ resource, FI_EP_RDM, EFA_FABRIC_NAME);
+
+ /* Open CQ and bind with FI_SELECTIVE_COMPLETION */
+ ret = fi_cq_open(resource->domain, &cq_attr, &resource->cq, NULL);
+ assert_int_equal(ret, 0);
+ ret = fi_ep_bind(resource->ep, &resource->cq->fid,
+ FI_SEND | FI_RECV | FI_SELECTIVE_COMPLETION);
+ assert_int_equal(ret, 0);
+
+ g_efa_unit_test_mocks.efa_qp_post_recv = &efa_mock_efa_qp_post_recv_return_mock;
+
+ /* Disable SHM so send goes through EFA path */
+ bool shm_permitted = false;
+ ret = fi_setopt(&resource->ep->fid, FI_OPT_ENDPOINT,
+ FI_OPT_SHARED_MEMORY_PERMITTED, &shm_permitted,
+ sizeof(shm_permitted));
+ assert_int_equal(ret, 0);
+
+ ret = fi_enable(resource->ep);
+ assert_int_equal(ret, 0);
+
+ ep = container_of(resource->ep, struct efa_rdm_ep,
+ base_ep.util_ep.ep_fid);
+
+ /* Verify tx_msg_flags is 0 (selective completion) */
+ assert_int_equal(ep->base_ep.util_ep.tx_msg_flags, 0);
+ /* tx_op_flags should not have FI_COMPLETION */
+ assert_false(ep->base_ep.util_ep.tx_op_flags & FI_COMPLETION);
+
+ /* Set up peer address */
+ ret = fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len);
+ assert_int_equal(ret, 0);
+ raw_addr.qpn = 1;
+ raw_addr.qkey = 0x1234;
+ ret = fi_av_insert(resource->av, &raw_addr, 1, &addr, 0, NULL);
+ assert_int_equal(ret, 1);
+
+ g_efa_unit_test_mocks.efa_qp_post_send = &efa_mock_efa_qp_post_send_return_mock;
+ will_return(efa_mock_efa_qp_post_send_return_mock, 0);
+
+ efa_unit_test_buff_construct(&send_buff, resource, 64);
+
+ /* Send without FI_COMPLETION flag */
+ ret = fi_send(resource->ep, send_buff.buff, send_buff.size,
+ fi_mr_desc(send_buff.mr), addr, NULL);
+ assert_int_equal(ret, 0);
+ assert_int_equal(g_ibv_submitted_wr_id_cnt, 1);
+
+ pkt_entry = efa_rdm_cq_get_pke_from_wr_id_solicited(
+ (uint64_t)g_ibv_submitted_wr_id_vec[0]);
+ assert_false(pkt_entry->ope->fi_flags & FI_COMPLETION);
+
+ efa_rdm_txe_release(pkt_entry->ope);
+ efa_rdm_pke_release_tx(pkt_entry);
+ efa_unit_test_buff_destruct(&send_buff);
+}
diff --git a/prov/efa/test/efa_unit_test_ope.c b/prov/efa/test/efa_unit_test_ope.c
index 269f5424375..c17ec6caddd 100644
--- a/prov/efa/test/efa_unit_test_ope.c
+++ b/prov/efa/test/efa_unit_test_ope.c
@@ -4,6 +4,7 @@
#include "efa_unit_tests.h"
#include "rdm/efa_rdm_pke_cmd.h"
#include "rdm/efa_rdm_pke_nonreq.h"
+#include "rdm/efa_rdm_proto_eager.h"
typedef void (*efa_rdm_ope_handle_error_func_t)(struct efa_rdm_ope *ope, int err, int prov_errno);
@@ -1333,6 +1334,8 @@ static void test_efa_rdm_txe_dc_release_common(struct efa_resource *resource, bo
/* Set DC packet type in wiredata */
struct efa_rdm_base_hdr *base_hdr = (struct efa_rdm_base_hdr *)dc_pkt_entry->wiredata;
base_hdr->type = EFA_RDM_DC_EAGER_MSGRTM_PKT;
+ dc_pkt_entry->handle_pke =
+ &efa_rdm_proto_eager_handle_rtm_dc_send_completion;
/* Create fake receipt packet entry */
receipt_pkt_entry = efa_rdm_pke_alloc(efa_rdm_ep, efa_rdm_ep->efa_rx_pkt_pool, EFA_RDM_PKE_FROM_EFA_RX_POOL);
@@ -1349,7 +1352,8 @@ static void test_efa_rdm_txe_dc_release_common(struct efa_resource *resource, bo
if (send_first) {
/* Send completion first - should not release TXE yet */
- efa_rdm_pke_handle_send_completion(dc_pkt_entry);
+ efa_rdm_ep_record_tx_op_completed(efa_rdm_ep, dc_pkt_entry);
+ efa_unit_test_pke_handle_send_completion(dc_pkt_entry);
assert_int_equal(efa_unit_test_get_dlist_length(&efa_rdm_ep->txe_list), 1);
assert_false(efa_rdm_txe_dc_ready_for_release(txe));
if (txe_in_send_state) {
@@ -1380,7 +1384,8 @@ static void test_efa_rdm_txe_dc_release_common(struct efa_resource *resource, bo
}
/* Send completion - should now release TXE */
- efa_rdm_pke_handle_send_completion(dc_pkt_entry);
+ efa_rdm_ep_record_tx_op_completed(efa_rdm_ep, dc_pkt_entry);
+ efa_unit_test_pke_handle_send_completion(dc_pkt_entry);
}
/* Verify TXE is released */
diff --git a/prov/efa/test/efa_unit_test_proto.c b/prov/efa/test/efa_unit_test_proto.c
new file mode 100644
index 00000000000..9a6741e3fe5
--- /dev/null
+++ b/prov/efa/test/efa_unit_test_proto.c
@@ -0,0 +1,420 @@
+/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */
+/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All
+ * rights reserved. */
+
+#include "efa_unit_tests.h"
+#include "rdm/efa_rdm_proto.h"
+#include "rdm/efa_rdm_proto_eager.h"
+
+/* Tests from efa_unit_test_proto_select.c */
+/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */
+/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All
+ * rights reserved. */
+
+
+/**
+ * @brief Helper to set up an endpoint, peer, and TXE for protocol selection
+ * tests.
+ *
+ * Returns the efa_rdm_ep. Caller must provide a peer_addr output and a
+ * pre-allocated txe pointer output.
+ */
+static struct efa_rdm_ep *setup_proto_select_test(struct efa_resource *resource,
+ fi_addr_t *peer_addr)
+{
+ struct efa_ep_addr raw_addr = {0};
+ size_t raw_addr_len = sizeof(raw_addr);
+ struct efa_rdm_ep *ep;
+
+ efa_unit_test_resource_construct_rdm_shm_disabled(resource);
+
+ ep = container_of(resource->ep, struct efa_rdm_ep,
+ base_ep.util_ep.ep_fid);
+
+ assert_int_equal(
+ fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0);
+ raw_addr.qpn = 1;
+ raw_addr.qkey = 0x1234;
+ assert_int_equal(
+ fi_av_insert(resource->av, &raw_addr, 1, peer_addr, 0, NULL),
+ 1);
+
+ return ep;
+}
+
+/**
+ * @brief Test that eager protocol is selected for small messages.
+ */
+void test_proto_select_eager_for_small_msg(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_rdm_ep *ep;
+ struct efa_rdm_peer *peer;
+ struct efa_rdm_ope *txe;
+ struct efa_rdm_proto *proto = NULL;
+ fi_addr_t peer_addr;
+ struct fi_msg msg = {0};
+ struct iovec iov;
+ int err;
+
+ ep = setup_proto_select_test(resource, &peer_addr);
+ peer = efa_rdm_ep_get_peer(ep, peer_addr);
+ peer->flags |= EFA_RDM_PEER_HANDSHAKE_RECEIVED;
+
+ iov.iov_base = NULL;
+ iov.iov_len = 64; /* Small message, fits in eager */
+ efa_unit_test_construct_msg(&msg, &iov, 1, peer_addr, NULL, 0, NULL);
+
+ txe = ofi_buf_alloc(ep->ope_pool);
+ assert_non_null(txe);
+
+ err = efa_rdm_proto_select_send_protocol(ep, peer, &msg, ofi_op_msg, 0,
+ txe, &proto);
+ assert_int_equal(err, 0);
+ assert_non_null(proto);
+ assert_ptr_equal(proto, &efa_rdm_proto_eager);
+
+ ofi_buf_free(txe);
+}
+
+/**
+ * @brief Test that zero-length messages select eager protocol.
+ */
+void test_proto_select_eager_for_zero_len_msg(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_rdm_ep *ep;
+ struct efa_rdm_peer *peer;
+ struct efa_rdm_ope *txe;
+ struct efa_rdm_proto *proto = NULL;
+ fi_addr_t peer_addr;
+ struct fi_msg msg = {0};
+ struct iovec iov;
+ int err;
+
+ ep = setup_proto_select_test(resource, &peer_addr);
+ peer = efa_rdm_ep_get_peer(ep, peer_addr);
+ peer->flags |= EFA_RDM_PEER_HANDSHAKE_RECEIVED;
+
+ iov.iov_base = NULL;
+ iov.iov_len = 0;
+ efa_unit_test_construct_msg(&msg, &iov, 1, peer_addr, NULL, 0, NULL);
+
+ txe = ofi_buf_alloc(ep->ope_pool);
+ assert_non_null(txe);
+
+ err = efa_rdm_proto_select_send_protocol(ep, peer, &msg, ofi_op_msg, 0,
+ txe, &proto);
+ assert_int_equal(err, 0);
+ assert_ptr_equal(proto, &efa_rdm_proto_eager);
+
+ ofi_buf_free(txe);
+}
+
+/**
+ * @brief Test that eager construct_tx_pkes produces exactly 1 PKE with
+ * the correct callback set.
+ */
+void test_proto_eager_construct_pkes_single_pke(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_unit_test_buff send_buff;
+ struct efa_rdm_ep *ep;
+ struct efa_rdm_peer *peer;
+ struct efa_rdm_ope *txe;
+ fi_addr_t peer_addr;
+ struct efa_ep_addr raw_addr = {0};
+ size_t raw_addr_len = sizeof(raw_addr);
+ struct fi_msg msg = {0};
+ struct iovec iov;
+ int err;
+
+ efa_unit_test_resource_construct_rdm_shm_disabled(resource);
+ efa_unit_test_buff_construct(&send_buff, resource, 64);
+
+ ep = container_of(resource->ep, struct efa_rdm_ep,
+ base_ep.util_ep.ep_fid);
+
+ assert_int_equal(
+ fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0);
+ raw_addr.qpn = 1;
+ raw_addr.qkey = 0x1234;
+ assert_int_equal(
+ fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL),
+ 1);
+
+ peer = efa_rdm_ep_get_peer(ep, peer_addr);
+ peer->flags |= EFA_RDM_PEER_HANDSHAKE_RECEIVED;
+
+ iov.iov_base = send_buff.buff;
+ iov.iov_len = send_buff.size;
+ efa_unit_test_construct_msg(&msg, &iov, 1, peer_addr, NULL, 0,
+ (void **) &send_buff.mr);
+
+ txe = ofi_buf_alloc(ep->ope_pool);
+ assert_non_null(txe);
+
+ /* Fill TXE as generic_send would before calling construct_tx_pkes */
+ txe->ep = ep;
+ efa_rdm_proto_txe_fill(txe, ep, peer, &msg, ofi_op_msg, 0, 0, 0);
+ txe->msg_id = peer->next_msg_id++;
+ txe->proto = &efa_rdm_proto_eager;
+
+ err = efa_rdm_proto_eager.construct_tx_pkes(ep, peer, &msg, ofi_op_msg,
+ 0, 0, 0, txe);
+ assert_int_equal(err, 0);
+ assert_int_equal(ep->send_pkt_entry_vec_size, 1);
+ assert_non_null(ep->send_pkt_entry_vec[0]);
+ assert_non_null(ep->send_pkt_entry_vec[0]->handle_pke);
+ assert_ptr_equal(ep->send_pkt_entry_vec[0]->ope, txe);
+
+ /* Clean up */
+ efa_rdm_pke_release_tx(ep->send_pkt_entry_vec[0]);
+ efa_rdm_txe_release(txe);
+ efa_unit_test_buff_destruct(&send_buff);
+}
+
+/**
+ * @brief Test that eager send completion callback releases TXE and PKE
+ * for non-DC messages.
+ */
+void test_proto_eager_send_completion_releases_txe(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_unit_test_buff send_buff;
+ struct efa_rdm_ep *ep;
+ struct efa_rdm_peer *peer;
+ struct efa_rdm_pke *pkt_entry;
+ fi_addr_t peer_addr;
+ struct efa_ep_addr raw_addr = {0};
+ size_t raw_addr_len = sizeof(raw_addr);
+ int err;
+
+ efa_unit_test_resource_construct_rdm_shm_disabled(resource);
+ efa_unit_test_buff_construct(&send_buff, resource, 64);
+
+ ep = container_of(resource->ep, struct efa_rdm_ep,
+ base_ep.util_ep.ep_fid);
+
+ assert_int_equal(
+ fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0);
+ raw_addr.qpn = 1;
+ raw_addr.qkey = 0x1234;
+ assert_int_equal(
+ fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL),
+ 1);
+
+ peer = efa_rdm_ep_get_peer(ep, peer_addr);
+ peer->flags |= EFA_RDM_PEER_HANDSHAKE_RECEIVED;
+
+ /* Mock efa_qp_post_send to succeed */
+ g_efa_unit_test_mocks.efa_qp_post_send =
+ &efa_mock_efa_qp_post_send_return_mock;
+ will_return_int_maybe(efa_mock_efa_qp_post_send_return_mock, 0);
+
+ /* Send a message via fi_send which goes through the new code path */
+ err = fi_send(resource->ep, send_buff.buff, send_buff.size,
+ fi_mr_desc(send_buff.mr), peer_addr, NULL);
+ assert_int_equal(err, 0);
+ assert_int_equal(efa_unit_test_get_dlist_length(&ep->txe_list), 1);
+
+ /* Get the TXE and PKE */
+ pkt_entry = ep->send_pkt_entry_vec[0];
+ assert_non_null(pkt_entry->handle_pke);
+
+ /* Simulate send completion: record_tx_op_completed + callback */
+ efa_rdm_ep_record_tx_op_completed(ep, pkt_entry);
+ pkt_entry->handle_pke(pkt_entry);
+
+ /* TXE should be released */
+ assert_int_equal(efa_unit_test_get_dlist_length(&ep->txe_list), 0);
+
+ efa_unit_test_buff_destruct(&send_buff);
+}
+
+/**
+ * @brief Test that eager assigns msg_id from peer->next_msg_id.
+ */
+void test_proto_eager_assigns_msg_id(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_unit_test_buff send_buff;
+ struct efa_rdm_ep *ep;
+ struct efa_rdm_peer *peer;
+ fi_addr_t peer_addr;
+ struct efa_ep_addr raw_addr = {0};
+ size_t raw_addr_len = sizeof(raw_addr);
+ uint32_t initial_msg_id;
+ int err;
+
+ efa_unit_test_resource_construct_rdm_shm_disabled(resource);
+ efa_unit_test_buff_construct(&send_buff, resource, 64);
+
+ ep = container_of(resource->ep, struct efa_rdm_ep,
+ base_ep.util_ep.ep_fid);
+
+ assert_int_equal(
+ fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0);
+ raw_addr.qpn = 1;
+ raw_addr.qkey = 0x1234;
+ assert_int_equal(
+ fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL),
+ 1);
+
+ peer = efa_rdm_ep_get_peer(ep, peer_addr);
+ peer->flags |= EFA_RDM_PEER_HANDSHAKE_RECEIVED;
+ initial_msg_id = peer->next_msg_id;
+
+ g_efa_unit_test_mocks.efa_qp_post_send =
+ &efa_mock_efa_qp_post_send_return_mock;
+ will_return_int_maybe(efa_mock_efa_qp_post_send_return_mock, 0);
+
+ err = fi_send(resource->ep, send_buff.buff, send_buff.size,
+ fi_mr_desc(send_buff.mr), peer_addr, NULL);
+ assert_int_equal(err, 0);
+
+ /* msg_id should have been assigned and next_msg_id incremented */
+ struct efa_rdm_ope *txe =
+ container_of(ep->txe_list.next, struct efa_rdm_ope, ep_entry);
+ assert_int_equal(txe->msg_id, initial_msg_id);
+ assert_int_equal(peer->next_msg_id, initial_msg_id + 1);
+
+ efa_unit_test_buff_destruct(&send_buff);
+}
+
+/**
+ * @brief Test that eager construct_tx_pkes produces a headerless PKE with
+ * EFA_RDM_PKE_SEND_TO_USER_RECV_QP flag when the peer expects zero-copy
+ * (headerless) data transfer.
+ */
+void test_proto_eager_construct_pkes_zero_copy(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_unit_test_buff send_buff;
+ struct efa_rdm_ep *ep;
+ struct efa_rdm_peer *peer;
+ struct efa_rdm_ope *txe;
+ fi_addr_t peer_addr;
+ struct efa_ep_addr raw_addr = {0};
+ size_t raw_addr_len = sizeof(raw_addr);
+ struct fi_msg msg = {0};
+ struct iovec iov;
+ int err;
+
+ efa_unit_test_resource_construct_rdm_shm_disabled(resource);
+ efa_unit_test_buff_construct(&send_buff, resource, 64);
+
+ ep = container_of(resource->ep, struct efa_rdm_ep,
+ base_ep.util_ep.ep_fid);
+
+ assert_int_equal(
+ fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0);
+ raw_addr.qpn = 1;
+ raw_addr.qkey = 0x1234;
+ assert_int_equal(
+ fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL),
+ 1);
+
+ peer = efa_rdm_ep_get_peer(ep, peer_addr);
+ peer->flags |= EFA_RDM_PEER_HANDSHAKE_RECEIVED;
+ /* Mark peer as expecting zero-copy transfer */
+ peer->extra_info[0] |= EFA_RDM_EXTRA_FEATURE_REQUEST_USER_RECV_QP;
+ peer->user_recv_qp.qpn = 99;
+ peer->user_recv_qp.qkey = 0xABCD;
+
+ iov.iov_base = send_buff.buff;
+ iov.iov_len = send_buff.size;
+ efa_unit_test_construct_msg(&msg, &iov, 1, peer_addr, NULL, 0,
+ (void **) &send_buff.mr);
+
+ txe = ofi_buf_alloc(ep->ope_pool);
+ assert_non_null(txe);
+ txe->ep = ep;
+ efa_rdm_proto_txe_fill(txe, ep, peer, &msg, ofi_op_msg, 0, 0, 0);
+ txe->msg_id = peer->next_msg_id++;
+ txe->proto = &efa_rdm_proto_eager;
+
+ err = efa_rdm_proto_eager.construct_tx_pkes(ep, peer, &msg, ofi_op_msg,
+ 0, 0, 0, txe);
+ assert_int_equal(err, 0);
+ assert_int_equal(ep->send_pkt_entry_vec_size, 1);
+
+ /* Verify headerless packet properties */
+ struct efa_rdm_pke *pke = ep->send_pkt_entry_vec[0];
+ assert_true(pke->flags & EFA_RDM_PKE_SEND_TO_USER_RECV_QP);
+ assert_true(pke->flags & EFA_RDM_PKE_HAS_NO_BASE_HDR);
+ assert_int_equal(pke->pkt_size, 64);
+ assert_int_equal(pke->payload_size, 64);
+
+ ofi_buf_free(pke);
+ efa_unit_test_buff_destruct(&send_buff);
+}
+
+/**
+ * @brief Test that a send is queued before handshake and dequeued after
+ * handshake completes when the peer may have zero-copy mode enabled.
+ */
+void test_proto_eager_queue_dequeue_handshake(struct efa_resource **state)
+{
+ struct efa_resource *resource = *state;
+ struct efa_unit_test_buff send_buff;
+ struct efa_rdm_ep *ep;
+ struct efa_rdm_peer *peer;
+ struct efa_domain *domain;
+ struct efa_rdm_ope *txe;
+ fi_addr_t peer_addr;
+ struct efa_ep_addr raw_addr = {0};
+ size_t raw_addr_len = sizeof(raw_addr);
+ struct fi_cq_tagged_entry cq_entry;
+ int ret;
+
+ efa_unit_test_resource_construct_rdm_shm_disabled(resource);
+ efa_unit_test_buff_construct(&send_buff, resource, 64);
+
+ ep = container_of(resource->ep, struct efa_rdm_ep,
+ base_ep.util_ep.ep_fid);
+ domain = efa_rdm_ep_domain(ep);
+
+ assert_int_equal(
+ fi_getname(&resource->ep->fid, &raw_addr, &raw_addr_len), 0);
+ raw_addr.qpn = 1;
+ raw_addr.qkey = 0x1234;
+ assert_int_equal(
+ fi_av_insert(resource->av, &raw_addr, 1, &peer_addr, 0, NULL),
+ 1);
+
+ peer = efa_rdm_ep_get_peer(ep, peer_addr);
+ peer->flags &= ~EFA_RDM_PEER_HANDSHAKE_RECEIVED;
+ ep->peer_may_have_zcpy_rx = true;
+
+ /* Mock post_send for handshake trigger */
+ g_efa_unit_test_mocks.efa_qp_post_send = &efa_mock_efa_qp_post_send_return_mock;
+ will_return(efa_mock_efa_qp_post_send_return_mock, 0);
+
+ ret = fi_send(resource->ep, send_buff.buff, send_buff.size,
+ fi_mr_desc(send_buff.mr), peer_addr, NULL);
+ assert_int_equal(ret, 0);
+
+ /* Verify the OPE is in the queued list */
+ assert_int_equal(ep->ope_queued_before_handshake_cnt, 1);
+ txe = container_of(domain->ope_queued_list.next,
+ struct efa_rdm_ope, queued_entry);
+ assert_true(dlist_entry_in_list(&txe->queued_entry,
+ &domain->ope_queued_list));
+
+ /* Simulate handshake received */
+ peer->flags |= EFA_RDM_PEER_HANDSHAKE_RECEIVED;
+
+ /* Mock post_send for the actual data packet */
+ will_return(efa_mock_efa_qp_post_send_return_mock, 0);
+
+ /* Progress via fi_cq_read which calls efa_domain_progress */
+ ret = fi_cq_read(resource->cq, &cq_entry, 1);
+ assert_int_equal(ret, -FI_EAGAIN);
+
+ /* Verify the OPE was dequeued and sent */
+ assert_int_equal(ep->ope_queued_before_handshake_cnt, 0);
+ assert_true(dlist_empty(&domain->ope_queued_list));
+
+ efa_unit_test_buff_destruct(&send_buff);
+}
diff --git a/prov/efa/test/efa_unit_test_rnr.c b/prov/efa/test/efa_unit_test_rnr.c
index 8b67d9c1b97..2b50dc05e69 100644
--- a/prov/efa/test/efa_unit_test_rnr.c
+++ b/prov/efa/test/efa_unit_test_rnr.c
@@ -69,7 +69,7 @@ void test_efa_rnr_queue_and_resend_impl(struct efa_resource **state, uint32_t op
assert_int_equal(efa_rdm_ep->efa_rnr_queued_pkt_cnt, 0);
assert_int_equal(efa_rdm_ep_get_peer(efa_rdm_ep, peer_addr)->rnr_queued_pkt_cnt, 0);
- efa_rdm_pke_handle_send_completion(pkt_entry);
+ efa_unit_test_pke_handle_send_completion(pkt_entry);
efa_unit_test_buff_destruct(&send_buff);
}
diff --git a/prov/efa/test/efa_unit_tests.c b/prov/efa/test/efa_unit_tests.c
index 6f6f7771361..466f783161c 100644
--- a/prov/efa/test/efa_unit_tests.c
+++ b/prov/efa/test/efa_unit_tests.c
@@ -491,6 +491,9 @@ int main(void)
cmocka_unit_test_setup_teardown(test_efa_rdm_rma_inject_write_0_byte_no_shm, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_rma_inject_writedata_0_byte_no_shm, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_msg_send_0_byte_with_inject_flag, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
+ cmocka_unit_test_setup_teardown(test_efa_rdm_msg_get_tx_flags, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
+ cmocka_unit_test_setup_teardown(test_efa_rdm_msg_send_dc_eager_pkt_type, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
+ cmocka_unit_test_setup_teardown(test_efa_rdm_msg_send_selective_completion, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_rma_write_0_byte_with_inject_flag, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_data_path_direct_rdma_read_multiple_sge_fail, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_data_path_direct_rdma_write_multiple_sge_fail, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
@@ -587,6 +590,15 @@ int main(void)
cmocka_unit_test_setup_teardown(test_efa_rdm_rma_partial_post_retry_no_double_free, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
cmocka_unit_test_setup_teardown(test_efa_rdm_rma_partial_post_retry_no_double_free_read, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
/* end efa_unit_test_rdm_rma.c */
+
+ /* begin efa_unit_test_proto.c */
+ cmocka_unit_test_setup_teardown(test_proto_select_eager_for_small_msg, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
+ cmocka_unit_test_setup_teardown(test_proto_select_eager_for_zero_len_msg, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
+ cmocka_unit_test_setup_teardown(test_proto_eager_construct_pkes_single_pke, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
+ cmocka_unit_test_setup_teardown(test_proto_eager_construct_pkes_zero_copy, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
+ cmocka_unit_test_setup_teardown(test_proto_eager_queue_dequeue_handshake, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
+ cmocka_unit_test_setup_teardown(test_proto_eager_send_completion_releases_txe, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
+ cmocka_unit_test_setup_teardown(test_proto_eager_assigns_msg_id, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown),
};
cmocka_set_message_output(CM_OUTPUT_XML);
diff --git a/prov/efa/test/efa_unit_tests.h b/prov/efa/test/efa_unit_tests.h
index 1c9b021f051..a62af7864ec 100644
--- a/prov/efa/test/efa_unit_tests.h
+++ b/prov/efa/test/efa_unit_tests.h
@@ -14,10 +14,25 @@
#include
#include
#include
-#include "stdio.h"
+#include
#include "efa.h"
+#include "efa_rdm_pke_cmd.h"
#include "efa_unit_test_mocks.h"
+/*
+ * TODO: Remove this utility once all protocols are migrated to the
+ * refactored code path with callbacks. At that point, all PKEs will
+ * have a callback set and we can call pkt_entry->handle_pke directly.
+ */
+static inline void
+efa_unit_test_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
+{
+ if (pkt_entry->handle_pke)
+ pkt_entry->handle_pke(pkt_entry);
+ else
+ efa_rdm_pke_handle_send_completion(pkt_entry);
+}
+
extern int g_ibv_ah_limit;
extern int g_ibv_ah_cnt;
extern int g_self_ah_cnt;
@@ -423,6 +438,9 @@ void test_efa_msg_sendmsg_multi_iov_second_desc_hmem_fails();
void test_efa_msg_sendmsg_inject_with_large_msg_fails();
void test_efa_msg_inject_with_large_msg_fails();
void test_efa_rdm_msg_send_0_byte_with_inject_flag();
+void test_efa_rdm_msg_get_tx_flags();
+void test_efa_rdm_msg_send_dc_eager_pkt_type();
+void test_efa_rdm_msg_send_selective_completion();
void test_efa_rdm_msg_send_0_byte_no_shm();
void test_efa_rdm_msg_sendv_0_byte_no_shm();
void test_efa_rdm_msg_sendmsg_0_byte_no_shm();
@@ -567,20 +585,24 @@ void test_efa_rdm_rma_partial_post_retry_no_double_free();
void test_efa_rdm_rma_partial_post_retry_no_double_free_read();
void test_efa_rdm_msg_send_multi_pkt_sendv_fail_no_inflight();
/* end efa_unit_test_rdm_rma.c */
-
static inline
int efa_unit_test_get_dlist_length(struct dlist_entry *head)
{
int i = 0;
struct dlist_entry *item;
-
dlist_foreach(head, item) {
i++;
}
-
return i;
}
-
void efa_unit_test_rdm_0byte_prep(struct efa_resource *resource, fi_addr_t *addr);
-
+/* Protocol TX path tests */
+void test_proto_select_eager_for_small_msg();
+void test_proto_select_eager_for_zero_len_msg();
+void test_proto_select_longread_over_longcts_with_p2p();
+void test_proto_eager_construct_pkes_single_pke();
+void test_proto_eager_construct_pkes_zero_copy();
+void test_proto_eager_queue_dequeue_handshake();
+void test_proto_eager_send_completion_releases_txe();
+void test_proto_eager_assigns_msg_id();
#endif