Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
65c1d1b
prov/efa: Add ep->send_pkt_entry_vec_size
sunkuamzn Mar 12, 2026
9775660
prov/efa: Check if the send queue is full earlier
sunkuamzn Mar 25, 2026
c92b331
prov/efa: Inline and remove efa_rdm_ep_alloc_txe
sunkuamzn Mar 25, 2026
7d9e360
prov/efa: Move memory alignment helper to header as static inline
sunkuamzn Mar 17, 2026
06f9c5c
prov/efa: Add per-packet callback for TX send completions
sunkuamzn Mar 10, 2026
2a34768
prov/efa: Introduce the efa_rdm_proto protocol interface
sunkuamzn Mar 10, 2026
e11d8f8
prov/efa: Add eager protocol to the new interface
sunkuamzn Mar 14, 2026
0466cad
prov/efa: Add protocol selection logic
sunkuamzn Mar 11, 2026
da7b386
prov/efa: Implement eager protocol packet construction
sunkuamzn Mar 14, 2026
6b58a9c
prov/efa: Implement eager protocol send completion callbacks
sunkuamzn Mar 14, 2026
bccadbb
prov/efa: Activate the refactored eager protocol in the send path
sunkuamzn Mar 13, 2026
61eedcb
prov/efa: Add efa_rdm_proto_txe_fill for new protocol TXE setup
sunkuamzn Mar 20, 2026
3da67aa
prov/efa: Add medium protocol to the new interface
sunkuamzn Apr 10, 2026
e262b44
prov/efa: Add name field to the protocol interface
sunkuamzn Mar 21, 2026
a13d445
prov/efa: Add long CTS protocol to the new interface
sunkuamzn Mar 21, 2026
34799a5
prov/efa: Extend can_use_protocol_for_send with peer and p2p args
sunkuamzn Mar 21, 2026
a050ce2
prov/efa: Add long read protocol to the new interface
sunkuamzn Mar 21, 2026
05a8fa8
prov/efa: Add runt read protocol to the new interface
sunkuamzn Mar 21, 2026
8ca33aa
prov/efa: Remove efa_rdm_pke_init_rtm_with_payload
sunkuamzn Mar 24, 2026
9bbb163
prov/efa: Reduce fallback send path to zero-copy only
sunkuamzn Mar 24, 2026
4dd7155
prov/efa: Remove efa_rdm_ope_post_send_fallback
sunkuamzn Mar 24, 2026
37d173a
prov/efa: Fix unit tests to use per-packet send completion callback
sunkuamzn Mar 16, 2026
e967530
prov/efa: Remove DC-before-handshake unit tests
sunkuamzn Mar 13, 2026
c2c99ea
prov/efa: Reimplement READ NACK send completion test for new interface
sunkuamzn Mar 21, 2026
ec57e37
prov/efa: Add protocol selection unit tests
sunkuamzn Apr 11, 2026
5de9539
prov/efa: Add eager protocol TX unit tests
sunkuamzn Apr 11, 2026
eac0407
prov/efa: Add medium protocol TX unit tests
sunkuamzn Apr 11, 2026
31fc136
prov/efa: Add long CTS protocol TX unit test
sunkuamzn Apr 11, 2026
cd868e6
prov/efa: Add long read protocol TX unit test
sunkuamzn Apr 11, 2026
30f6a47
prov/efa: Add runt read protocol TX unit test
sunkuamzn Apr 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions prov/efa/Makefile.include
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,13 @@ _efa_files = \
prov/efa/src/rdm/efa_rdm_tracepoint_def.c \
prov/efa/src/rdm/efa_rdm_srx.c \
prov/efa/src/rdm/efa_rdm_util.c \
prov/efa/src/rdm/efa_rdm_mr.c
prov/efa/src/rdm/efa_rdm_mr.c \
prov/efa/src/rdm/efa_rdm_proto.c \
prov/efa/src/rdm/efa_rdm_proto_eager.c \
prov/efa/src/rdm/efa_rdm_proto_medium.c \
prov/efa/src/rdm/efa_rdm_proto_longcts.c \
prov/efa/src/rdm/efa_rdm_proto_longread.c \
prov/efa/src/rdm/efa_rdm_proto_runtread.c

if ENABLE_EFA_UNIT_TEST
_efa_files += prov/efa/test/efa_unit_test_data_path_ops.c
Expand Down Expand Up @@ -140,7 +146,13 @@ _efa_headers = \
prov/efa/src/rdm/efa_rdm_tracepoint.h \
prov/efa/src/rdm/efa_rdm_srx.h \
prov/efa/src/rdm/efa_rdm_util.h \
prov/efa/src/rdm/efa_rdm_mr.h
prov/efa/src/rdm/efa_rdm_mr.h \
prov/efa/src/rdm/efa_rdm_proto.h \
prov/efa/src/rdm/efa_rdm_proto_eager.h \
prov/efa/src/rdm/efa_rdm_proto_medium.h \
prov/efa/src/rdm/efa_rdm_proto_longcts.h \
prov/efa/src/rdm/efa_rdm_proto_longread.h \
prov/efa/src/rdm/efa_rdm_proto_runtread.h

if HAVE_LTTNG
efa_LDFLAGS += -llttng-ust
Expand Down Expand Up @@ -174,7 +186,8 @@ nodist_prov_efa_test_efa_unit_test_SOURCES = \
prov/efa/test/efa_unit_test_msg.c \
prov/efa/test/efa_unit_test_rma.c \
prov/efa/test/efa_unit_test_rdm_rma.c \
prov/efa/test/efa_unit_test_data_path_direct.c
prov/efa/test/efa_unit_test_data_path_direct.c \
prov/efa/test/efa_unit_test_proto.c


efa_CPPFLAGS += -I$(top_srcdir)/include -I$(top_srcdir)/prov/efa/test $(cmocka_CPPFLAGS)
Expand Down
15 changes: 0 additions & 15 deletions prov/efa/src/efa.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,6 @@
#define EFA_DEFAULT_INTER_MIN_READ_WRITE_SIZE (65536)
#define EFA_DEFAULT_INTRA_MAX_GDRCOPY_FROM_DEV_SIZE (3072)

/*
* The default memory alignment
*/
#define EFA_RDM_DEFAULT_MEMORY_ALIGNMENT (8)

/*
* The CUDA memory alignment
*/
#define EFA_RDM_CUDA_MEMORY_ALIGNMENT (64)

/*
* The alignment to support in-order aligned ops.
*/
#define EFA_RDM_IN_ORDER_ALIGNMENT (128)

/*
* Set alignment to x86 cache line size.
*/
Expand Down
4 changes: 0 additions & 4 deletions prov/efa/src/rdm/efa_rdm_atomic.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ efa_rdm_atomic_alloc_txe(struct efa_rdm_ep *efa_rdm_ep,
return NULL;
}

efa_domain_ope_list_lock(efa_rdm_ep_domain(efa_rdm_ep));
dlist_insert_tail(&txe->ep_entry, &efa_rdm_ep->txe_list);
efa_domain_ope_list_unlock(efa_rdm_ep_domain(efa_rdm_ep));

ofi_ioc_to_iov(msg_atomic->msg_iov, iov, msg_atomic->iov_count, datatype_size);
msg.addr = msg_atomic->addr;
msg.msg_iov = iov;
Expand Down
32 changes: 30 additions & 2 deletions prov/efa/src/rdm/efa_rdm_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -842,8 +842,36 @@ enum ibv_wc_status efa_rdm_cq_process_wc(struct efa_ibv_cq *cq, struct efa_rdm_e
#if ENABLE_DEBUG
ep->send_comps++;
#endif
efa_rdm_pke_handle_send_completion(pkt_entry);
efa_rdm_cq_increment_pkt_entry_gen(pkt_entry);
if (pkt_entry->callback) {
efa_rdm_ep_record_tx_op_completed(pkt_entry->ep,
pkt_entry);
/*
* For a send completion, pkt_entry->peer can be
* NULL in 3 situations:
* 1. the pkt_entry is used for a local read
* operation
* 2. a new peer with same gid+qpn was inserted
* to av, thus the peer was removed from AV.
* 3. application removed the peer's address
* from av. In 1, we should proceed. For 2 and
* 3, the send completion should be ignored.
*/
if (!pkt_entry->peer &&
!(pkt_entry->flags &
EFA_RDM_PKE_LOCAL_READ)) {
EFA_WARN(
FI_LOG_CQ,
"ignoring send completion of a "
"packet to a removed peer.\n");
efa_rdm_pke_release_tx(pkt_entry);
} else {
pkt_entry->callback(pkt_entry);
}
efa_rdm_cq_increment_pkt_entry_gen(pkt_entry);
} else {
efa_rdm_pke_handle_send_completion(pkt_entry);
efa_rdm_cq_increment_pkt_entry_gen(pkt_entry);
}
break;
case IBV_WC_RECV:
/* efa_rdm_cq_handle_recv_completion does additional work to determine the source
Expand Down
46 changes: 37 additions & 9 deletions prov/efa/src/rdm/efa_rdm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ struct efa_rdm_ep_queued_copy {

#define EFA_RDM_MAX_QUEUED_COPY (8)

/*
* The default memory alignment
*/
#define EFA_RDM_DEFAULT_MEMORY_ALIGNMENT (8)

/*
* The CUDA memory alignment
*/
#define EFA_RDM_CUDA_MEMORY_ALIGNMENT (64)

/*
* The alignment to support in-order aligned ops.
*/
#define EFA_RDM_IN_ORDER_ALIGNMENT (128)

/**
* Max number of opes queued before handshake is made
* with their peers. This cnt is per EP.
Expand Down Expand Up @@ -195,7 +210,8 @@ struct efa_rdm_ep {
struct efa_rdm_pke **pke_vec;
/* Work arrays for efa_rdm_ope_post_send to avoid stack allocation */
struct efa_rdm_pke **send_pkt_entry_vec;
int *send_pkt_entry_size_vec;
size_t *send_pkt_entry_data_sizes;
size_t send_pkt_entry_vec_size;
struct dlist_entry entry;
/* the count of opes queued before handshake is made with their peers */
size_t ope_queued_before_handshake_cnt;
Expand All @@ -220,13 +236,6 @@ struct efa_rdm_peer *efa_rdm_ep_get_peer_explicit(struct efa_rdm_ep *ep, fi_addr
int32_t efa_rdm_ep_get_peer_ahn(struct efa_rdm_ep *ep, fi_addr_t addr);
struct efa_rdm_peer *efa_rdm_ep_get_peer_implicit(struct efa_rdm_ep *ep, fi_addr_t addr);

struct efa_rdm_ope *efa_rdm_ep_alloc_txe(struct efa_rdm_ep *efa_rdm_ep,
struct efa_rdm_peer *peer,
const struct fi_msg *msg,
uint32_t op,
uint64_t tag,
uint64_t flags);

struct efa_rdm_ope *efa_rdm_ep_alloc_rxe(struct efa_rdm_ep *ep,
struct efa_rdm_peer *peer, uint32_t op);

Expand All @@ -253,7 +262,26 @@ void efa_rdm_ep_queue_rnr_pkt(struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_ent
ssize_t efa_rdm_ep_post_queued_pkts(struct efa_rdm_ep *ep,
struct dlist_entry *pkts);

size_t efa_rdm_ep_get_memory_alignment(struct efa_rdm_ep *ep, enum fi_hmem_iface iface);
/**
* @brief Get memory alignment for given ep and hmem iface
*
* @param ep efa rdm ep
* @param iface hmem iface
* @return size_t the memory alignment
*/
static inline size_t efa_rdm_ep_get_memory_alignment(struct efa_rdm_ep *ep,
enum fi_hmem_iface iface)
{
size_t memory_alignment = EFA_RDM_DEFAULT_MEMORY_ALIGNMENT;

if (ep->sendrecv_in_order_aligned_128_bytes) {
memory_alignment = EFA_RDM_IN_ORDER_ALIGNMENT;
} else if (iface == FI_HMEM_CUDA) {
memory_alignment = EFA_RDM_CUDA_MEMORY_ALIGNMENT;
}

return memory_alignment;
}

static inline
struct efa_domain *efa_rdm_ep_domain(struct efa_rdm_ep *ep)
Expand Down
14 changes: 9 additions & 5 deletions prov/efa/src/rdm/efa_rdm_ep_fiops.c
Original file line number Diff line number Diff line change
Expand Up @@ -680,9 +680,13 @@ int efa_rdm_ep_open(struct fid_domain *domain, struct fi_info *info,
goto err_free_pke_vec;
}

efa_rdm_ep->send_pkt_entry_size_vec = calloc(sizeof(int), efa_base_ep_get_tx_pool_size(&efa_rdm_ep->base_ep));
if (!efa_rdm_ep->send_pkt_entry_size_vec) {
EFA_WARN(FI_LOG_EP_CTRL, "cannot alloc memory for efa_rdm_ep->send_pkt_entry_size_vec!\n");
efa_rdm_ep->send_pkt_entry_data_sizes =
calloc(sizeof(size_t),
efa_base_ep_get_tx_pool_size(&efa_rdm_ep->base_ep));
if (!efa_rdm_ep->send_pkt_entry_data_sizes) {
EFA_WARN(FI_LOG_EP_CTRL,
"cannot alloc memory for "
"efa_rdm_ep->send_pkt_entry_data_sizes!\n");
ret = -FI_ENOMEM;
goto err_free_send_pkt_entry_vec;
}
Expand Down Expand Up @@ -1190,8 +1194,8 @@ static int efa_rdm_ep_close(struct fid *fid)
free(efa_rdm_ep->pke_vec);
if (efa_rdm_ep->send_pkt_entry_vec)
free(efa_rdm_ep->send_pkt_entry_vec);
if (efa_rdm_ep->send_pkt_entry_size_vec)
free(efa_rdm_ep->send_pkt_entry_size_vec);
if (efa_rdm_ep->send_pkt_entry_data_sizes)
free(efa_rdm_ep->send_pkt_entry_data_sizes);

ofi_genlock_unlock(&domain->srx_lock);

Expand Down
61 changes: 7 additions & 54 deletions prov/efa/src/rdm/efa_rdm_ep_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,36 +317,6 @@ int efa_rdm_ep_post_user_recv_buf(struct efa_rdm_ep *ep, struct efa_rdm_ope *rxe
return err;
}



/* create a new txe */
struct efa_rdm_ope *efa_rdm_ep_alloc_txe(struct efa_rdm_ep *efa_rdm_ep,
struct efa_rdm_peer *peer,
const struct fi_msg *msg,
uint32_t op,
uint64_t tag,
uint64_t flags)
{
struct efa_rdm_ope *txe;

txe = ofi_buf_alloc(efa_rdm_ep->ope_pool);
if (OFI_UNLIKELY(!txe)) {
EFA_DBG(FI_LOG_EP_CTRL, "TX entries exhausted.\n");
return NULL;
}

efa_rdm_txe_construct(txe, efa_rdm_ep, peer, msg, op, flags);
if (op == ofi_op_tagged) {
txe->cq_entry.tag = tag;
txe->tag = tag;
}

efa_domain_ope_list_lock(efa_rdm_ep_domain(efa_rdm_ep));
dlist_insert_tail(&txe->ep_entry, &efa_rdm_ep->txe_list);
efa_domain_ope_list_unlock(efa_rdm_ep_domain(efa_rdm_ep));
return txe;
}

/**
* @brief record the event that a TX op has been submitted
*
Expand Down Expand Up @@ -674,15 +644,18 @@ static ssize_t efa_rdm_ep_handshake_common(struct efa_rdm_ep *ep, struct efa_rdm

msg.addr = peer->conn->fi_addr;

txe = efa_rdm_ep_alloc_txe(ep, peer, &msg, ofi_op_write, 0, 0);

txe = ofi_buf_alloc(ep->ope_pool);
if (OFI_UNLIKELY(!txe)) {
EFA_WARN(FI_LOG_EP_CTRL, "TX entries exhausted.\n");
return -FI_EAGAIN;
}

/* efa_rdm_ep_alloc_txe() joins ep->base_ep.util_ep.tx_op_flags and passed in flags,
* reset to desired flags (remove things like FI_DELIVERY_COMPLETE, and FI_COMPLETION)
efa_rdm_txe_construct(txe, ep, peer, &msg, ofi_op_write, 0);

/*
* efa_rdm_txe_construct() joins ep->base_ep.util_ep.tx_op_flags and
* passed in flags, reset to desired flags (remove things like
* FI_DELIVERY_COMPLETE, and FI_COMPLETION)
*/
txe->fi_flags = EFA_RDM_TXE_NO_COMPLETION | EFA_RDM_TXE_NO_COUNTER;
txe->internal_flags |= EFA_RDM_OPE_INTERNAL;
Expand Down Expand Up @@ -1068,26 +1041,6 @@ void efa_rdm_ep_post_internal_rx_pkts(struct efa_rdm_ep *ep)
efa_base_ep_write_eq_error(&ep->base_ep, err, FI_EFA_ERR_INTERNAL_RX_BUF_POST);
}

/**
* @brief Get memory alignment for given ep and hmem iface
*
* @param ep efa rdm ep
* @param iface hmem iface
* @return size_t the memory alignment
*/
size_t efa_rdm_ep_get_memory_alignment(struct efa_rdm_ep *ep, enum fi_hmem_iface iface)
{
size_t memory_alignment = EFA_RDM_DEFAULT_MEMORY_ALIGNMENT;

if (ep->sendrecv_in_order_aligned_128_bytes) {
memory_alignment = EFA_RDM_IN_ORDER_ALIGNMENT;
} else if (iface == FI_HMEM_CUDA) {
memory_alignment = EFA_RDM_CUDA_MEMORY_ALIGNMENT;
}

return memory_alignment;
}

/**
* @brief Enforce a handshake to made for given txe.
* It will trigger a handshake with peer and choose to
Expand Down
Loading
Loading