Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ep_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,17 @@ void efa_rdm_ep_record_tx_op_completed(struct efa_rdm_ep *ep, struct efa_rdm_pke
pkt_entry->peer->efa_outstanding_tx_ops--;

if (ope) {
/*
* This assertion can fail if an ope is released while it
* still has outstanding TX ops, and the buffer pool slot
* is reused by a new ope (reset the counter to 0).
* The stale send completion then
* decrements the new ope's counter, causing underflow.
*/
if (ope->efa_outstanding_tx_ops == 0) {
EFA_WARN(FI_LOG_EP_DATA, "decrementing ope->efa_outstanding_tx_ops from 0, pkt type: %d\n", efa_rdm_pkt_type_of(pkt_entry));
assert(ope->efa_outstanding_tx_ops > 0);
}
ope->efa_outstanding_tx_ops--;
switch(efa_rdm_pkt_type_of(pkt_entry)) {
case EFA_RDM_RECEIPT_PKT:
Expand Down
65 changes: 53 additions & 12 deletions prov/efa/src/rdm/efa_rdm_ope.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ void efa_rdm_txe_release(struct efa_rdm_ope *txe)
* (which would have already removed it from the list).
*/
if (txe->state == EFA_RDM_OPE_SEND &&
!(txe->internal_flags & EFA_RDM_TXE_RECEIPT_RECEIVED))
!(txe->internal_flags & EFA_RDM_TXE_REMOTE_ACK_RECEIVED))
dlist_remove(&txe->entry);

dlist_foreach_container_safe(&txe->queued_pkts,
Expand Down Expand Up @@ -734,7 +734,7 @@ void efa_rdm_txe_handle_error(struct efa_rdm_ope *txe, int err, int prov_errno)
case EFA_RDM_TXE_REQ:
break;
case EFA_RDM_OPE_SEND:
if (!(txe->internal_flags & EFA_RDM_TXE_RECEIPT_RECEIVED))
if (!(txe->internal_flags & EFA_RDM_TXE_REMOTE_ACK_RECEIVED))
dlist_remove(&txe->entry);
break;
case EFA_RDM_OPE_ERR:
Expand Down Expand Up @@ -1013,7 +1013,6 @@ void efa_rdm_txe_report_completion(struct efa_rdm_ope *txe)
txe->peer->conn->fi_addr, txe->tx_id, txe->msg_id,
txe->cq_entry.tag, txe->total_len);


efa_rdm_tracepoint(send_end,
txe->msg_id, (size_t) txe->cq_entry.op_context,
txe->total_len, txe->cq_entry.tag, txe->peer->conn->fi_addr);
Expand Down Expand Up @@ -1188,6 +1187,15 @@ void efa_rdm_ope_handle_recv_completed(struct efa_rdm_ope *ope)
efa_rdm_rxe_report_completion(rxe);
}

/*
* Mark recv completed before any release attempts below.
* This flag is checked by send completion handlers
* (efa_rdm_pke_handle_cts_send_completion for CTS,
* efa_rdm_pke_handle_send_completion for SHORT_RTR/LONGCTS_RTR)
* to decide whether a deferred release should proceed.
*/
ope->internal_flags |= EFA_RDM_OPE_RECV_COMPLETED;

/* As can be seen, this function does not release rxe when
* efa_rdm_ope_post_send_or_queue() was successful.
*
Expand All @@ -1202,6 +1210,17 @@ void efa_rdm_ope_handle_recv_completed(struct efa_rdm_ope *ope)
if (ope->internal_flags & EFA_RDM_TXE_DELIVERY_COMPLETE_REQUESTED) {
assert(ope->type == EFA_RDM_RXE);
rxe = ope; /* Intentionally assigned for easier understanding */
/*
* Set ACK_IN_FLIGHT before posting RECEIPT. This must
* be done before efa_rdm_ope_post_send_or_queue() because
* the RECEIPT may be queued (not immediately posted), in
* which case efa_outstanding_tx_ops is NOT incremented
* yet. Without this flag, a pending CTS send completion
* could see ops == 0 and release the rxe while the
* RECEIPT is still queued, causing a hang.
* Cleared in efa_rdm_pke_handle_receipt_send_completion.
*/
rxe->internal_flags |= EFA_RDM_RXE_ACK_IN_FLIGHT;
err = efa_rdm_ope_post_send_or_queue(rxe, EFA_RDM_RECEIPT_PKT);
if (OFI_UNLIKELY(err)) {
EFA_WARN(FI_LOG_CQ,
Expand All @@ -1219,20 +1238,42 @@ void efa_rdm_ope_handle_recv_completed(struct efa_rdm_ope *ope)
* it is possible that when this function is called, EOR is still inflight
* (EOR has been sent, and the send completion has NOT been received).
*
* If EOR is inflight, the rxe cannot be released because the rxe
* is needed to handle the send completion of the EOR.
* Similarly, a RECEIPT packet may have been posted or queued above
* for DC protocols, setting RXE_ACK_IN_FLIGHT.
*
* see #efa_rdm_pke_handle_eor_send_completion
* In either case, the rxe cannot be released here because it is
* needed to handle the send completion of the EOR or RECEIPT.
*/
if (ope->internal_flags & EFA_RDM_RXE_EOR_IN_FLIGHT) {
if (ope->internal_flags & EFA_RDM_RXE_ACK_IN_FLIGHT) {
/*
* An EOR or RECEIPT is in flight / queued. The rxe
* cannot be released until its send completion arrives.
* The send completion handler will release the rxe.
*
* see #efa_rdm_pke_handle_eor_send_completion
* see #efa_rdm_pke_handle_receipt_send_completion
*/
return;
}

if (ope->type == EFA_RDM_TXE) {
efa_rdm_txe_release(ope);
} else {
assert(ope->type == EFA_RDM_RXE);
efa_rdm_rxe_release(ope);
/*
* Release the ope only if all outstanding TX ops (e.g. CTS,
* RTR send completions) have arrived. If not, the release is
* deferred to the corresponding send completion handler:
*
* - txe (emulated read): RTR or CTS send completion handler
* see #efa_rdm_pke_handle_cts_send_completion
*
* - rxe (longcts msg/write): CTS send completion handler
* see #efa_rdm_pke_handle_cts_send_completion
*/
if (ope->efa_outstanding_tx_ops == 0) {
if (ope->type == EFA_RDM_TXE) {
efa_rdm_txe_release(ope);
} else {
assert(ope->type == EFA_RDM_RXE);
efa_rdm_rxe_release(ope);
}
}
}

Expand Down
110 changes: 89 additions & 21 deletions prov/efa/src/rdm/efa_rdm_ope.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,19 @@ void efa_rdm_rxe_release_internal(struct efa_rdm_ope *rxe);
#define EFA_RDM_OPE_QUEUED_RNR BIT_ULL(9)

/**
* @brief Flag to indicate an rxe has an EOR in flight
* @brief Flag to indicate an rxe has an EOR or RECEIPT in flight.
*
* In flag means the EOR has been sent or queued, and has not got send completion.
* hence the rxe cannot be released
* This flag is set when an EOR or RECEIPT packet has been sent or
* queued but its send completion has not yet arrived. While set,
* the rxe cannot be released.
*
* For EOR: set in efa_rdm_pke_handle_rma_read_completion,
* cleared in efa_rdm_pke_handle_eor_send_completion.
* For RECEIPT: set in efa_rdm_ope_handle_recv_completed (before
* posting RECEIPT), cleared in
* efa_rdm_pke_handle_receipt_send_completion.
*/
#define EFA_RDM_RXE_EOR_IN_FLIGHT BIT_ULL(10)
#define EFA_RDM_RXE_ACK_IN_FLIGHT BIT_ULL(10)

/**
* @brief flag to indicate a txe has already written an cq error entry for RNR
Expand Down Expand Up @@ -272,13 +279,18 @@ void efa_rdm_rxe_release_internal(struct efa_rdm_ope *rxe);
#define EFA_RDM_OPE_INTERNAL BIT_ULL(15)

/**
* @brief flag to indicate that a DC txe has received its receipt packet
* @brief Flag to indicate that a txe has received a remote
* acknowledgment (RECEIPT or ATOMRSP).
*
* For DC protocols: set in efa_rdm_pke_handle_receipt_recv when
* the RECEIPT packet arrives from the remote.
* For fetch/compare atomics: set in efa_rdm_pke_handle_atomrsp_recv
* when the ATOMRSP packet arrives.
*
* This flag is used to track when a delivery complete operation has
* received acknowledgment from the receiver, preventing premature
* completion before all TX operations finish.
* The txe can only be released when both this flag is set AND all
* outstanding TX ops have completed (efa_outstanding_tx_ops == 0).
*/
#define EFA_RDM_TXE_RECEIPT_RECEIVED BIT_ULL(16)
#define EFA_RDM_TXE_REMOTE_ACK_RECEIVED BIT_ULL(16)

/**
* @brief flag to indicate an ope does not need to report completion to user
Expand All @@ -294,6 +306,16 @@ void efa_rdm_rxe_release_internal(struct efa_rdm_ope *rxe);
*/
#define EFA_RDM_TXE_NO_COUNTER BIT_ULL(18)

/**
* @brief flag to indicate that efa_rdm_ope_handle_recv_completed was called.
*
* For txe: this means an emulated read protocol received all data from
* the remote. For rxe: this means all data has been received and copied
* to the application buffer. The ope can only be released when both
* this flag is set AND all outstanding TX ops have completed.
*/
#define EFA_RDM_OPE_RECV_COMPLETED BIT_ULL(19)

#define EFA_RDM_OPE_QUEUED_FLAGS (EFA_RDM_OPE_QUEUED_RNR | EFA_RDM_OPE_QUEUED_CTRL | EFA_RDM_OPE_QUEUED_READ | EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE)

void efa_rdm_ope_try_fill_desc(struct efa_rdm_ope *ope, int mr_iov_start, uint64_t access);
Expand All @@ -318,24 +340,70 @@ void efa_rdm_ope_handle_recv_completed(struct efa_rdm_ope *ope);
void efa_rdm_ope_handle_send_completed(struct efa_rdm_ope *ope);

/**
* @brief Check if a delivery complete (DC) TXE is ready for release
* @brief Check if a DC or atomic txe is ready for release.
*
* Used by: DC eager/medium/longcts msg/tag/write, DC CTSDATA,
* FETCH_RTA, COMPARE_RTA.
*
* These protocols require a remote acknowledgment (RECEIPT for DC,
* ATOMRSP for fetch/compare atomics) before the txe can complete.
* The txe is only released when both:
* 1. The remote ack has arrived (EFA_RDM_TXE_REMOTE_ACK_RECEIVED set)
* 2. All packet send completions have arrived (efa_outstanding_tx_ops == 0)
*
* @param[in] txe TX operation entry to check
* @return true if txe is ready for release, false otherwise
*/
static inline bool efa_rdm_txe_with_remote_ack_ready_for_release(struct efa_rdm_ope *txe)
{
return (txe->efa_outstanding_tx_ops == 0 &&
(txe->internal_flags & EFA_RDM_TXE_REMOTE_ACK_RECEIVED));
}

/**
* @brief Check if a longcts rxe is ready for release.
*
* Used by: CTS send completion handler for longcts msg/write
* (both DC and non-DC).
*
* The rxe can only be released when all of:
* 1. Recv completed (EFA_RDM_OPE_RECV_COMPLETED set)
* 2. All send completions arrived (efa_outstanding_tx_ops == 0)
* 3. For DC: no ack packet (RECEIPT) is in flight
* (EFA_RDM_RXE_ACK_IN_FLIGHT clear). The ACK_IN_FLIGHT check
* is needed because RECEIPT may be queued (not yet posted),
* in which case efa_outstanding_tx_ops has not been
* incremented yet.
*
* @param[in] rxe RX operation entry to check
* @return true if rxe is ready for release, false otherwise
*/
static inline bool efa_rdm_rxe_cts_ready_for_release(struct efa_rdm_ope *rxe)
{
return (rxe->efa_outstanding_tx_ops == 0 &&
(rxe->internal_flags & EFA_RDM_OPE_RECV_COMPLETED) &&
!(rxe->internal_flags & EFA_RDM_RXE_ACK_IN_FLIGHT));
}

/**
* @brief Check if an emulated read txe is ready for release.
*
* @details
* For DC packets, this function prevents use-after-free race conditions by
* ensuring the TXE is only released when both conditions are met:
* 1. All TX operations have completed (efa_outstanding_tx_ops == 0)
* 2. Receipt packet has been received (EFA_RDM_TXE_RECEIPT_RECEIVED flag set)
* Used by: SHORT_RTR and LONGCTS_RTR send completion handlers,
* and CTS send completion handler for emulated longcts read.
*
* This dual-condition check ensures proper synchronization between send
* completions and receipt acknowledgments in the delivery complete protocol.
* In emulated read, the txe posts an RTR (or CTS for longcts read)
* and then receives data from the remote. The txe can only be
* released when both:
* 1. Recv completed (EFA_RDM_OPE_RECV_COMPLETED set)
* 2. All send completions arrived (efa_outstanding_tx_ops == 0)
*
* @param[in] txe TX operation entry to check
* @return true if TXE is ready for release, false otherwise
* @return true if txe is ready for release, false otherwise
*/
static inline bool efa_rdm_txe_dc_ready_for_release(struct efa_rdm_ope *txe)
static inline bool efa_rdm_txe_emulated_read_ready_for_release(struct efa_rdm_ope *txe)
{
return (txe->efa_outstanding_tx_ops == 0) &&
(txe->internal_flags & EFA_RDM_TXE_RECEIPT_RECEIVED);
return (txe->efa_outstanding_tx_ops == 0 &&
(txe->internal_flags & EFA_RDM_OPE_RECV_COMPLETED));
}

int efa_rdm_ope_prepare_to_post_read(struct efa_rdm_ope *ope);
Expand Down
24 changes: 14 additions & 10 deletions prov/efa/src/rdm/efa_rdm_pke_cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
efa_rdm_txe_release(pkt_entry->ope);
break;
case EFA_RDM_CTS_PKT:
efa_rdm_pke_handle_cts_send_completion(pkt_entry);
break;
case EFA_RDM_CTSDATA_PKT:
efa_rdm_pke_handle_ctsdata_send_completion(pkt_entry);
Expand Down Expand Up @@ -621,20 +622,23 @@ void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
break;
case EFA_RDM_SHORT_RTR_PKT:
case EFA_RDM_LONGCTS_RTR_PKT:
/* Unlike other protocol, for emulated read, txe
* is released in efa_rdm_ope_handle_recv_completed().
* Therefore there is nothing to be done here.
/* For emulated read, the txe is released either here
* or in efa_rdm_ope_handle_recv_completed(), whichever
* happens last. Release here if recv already completed.
*/
assert(pkt_entry->ope);
if (efa_rdm_txe_emulated_read_ready_for_release(pkt_entry->ope))
efa_rdm_txe_release(pkt_entry->ope);
break;
case EFA_RDM_WRITE_RTA_PKT:
efa_rdm_pke_handle_write_rta_send_completion(pkt_entry);
break;
case EFA_RDM_FETCH_RTA_PKT:
/* no action to be taken here */
break;
case EFA_RDM_COMPARE_RTA_PKT:
/* no action to be taken here */
break;
case EFA_RDM_FETCH_RTA_PKT: /* fall through */
case EFA_RDM_COMPARE_RTA_PKT: /* fall through */
/* For fetch/compare atomics, the txe is released either
* here or in efa_rdm_pke_handle_atomrsp_recv(), whichever
* happens last. Release here if ATOMRSP already arrived.
*/
case EFA_RDM_DC_EAGER_MSGRTM_PKT:
case EFA_RDM_DC_EAGER_TAGRTM_PKT:
case EFA_RDM_DC_MEDIUM_MSGRTM_PKT:
Expand All @@ -651,7 +655,7 @@ void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
* Only release TXE when both TX ops complete and receipt is received.
*/
assert(pkt_entry->ope);
if (efa_rdm_txe_dc_ready_for_release(pkt_entry->ope))
if (efa_rdm_txe_with_remote_ack_ready_for_release(pkt_entry->ope))
efa_rdm_txe_release(pkt_entry->ope);
break;
case EFA_RDM_READ_NACK_PKT:
Expand Down
Loading
Loading