Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions prov/efa/src/rdm/efa_rdm_ep_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ void efa_rdm_ep_record_tx_op_completed(struct efa_rdm_ep *ep, struct efa_rdm_pke
pkt_entry->peer->efa_outstanding_tx_ops--;

if (ope) {
assert(ope->efa_outstanding_tx_ops > 0);
ope->efa_outstanding_tx_ops--;
switch(efa_rdm_pkt_type_of(pkt_entry)) {
case EFA_RDM_RECEIPT_PKT:
Expand Down
18 changes: 14 additions & 4 deletions prov/efa/src/rdm/efa_rdm_ope.c
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ void efa_rdm_txe_release(struct efa_rdm_ope *txe)
* (which would have already removed it from the list).
*/
if (txe->state == EFA_RDM_OPE_SEND &&
!(txe->internal_flags & EFA_RDM_TXE_RECEIPT_RECEIVED))
!(txe->internal_flags & EFA_RDM_TXE_RESPONSE_RECEIVED))
dlist_remove(&txe->entry);

dlist_foreach_container_safe(&txe->queued_pkts,
Expand Down Expand Up @@ -732,7 +732,7 @@ void efa_rdm_txe_handle_error(struct efa_rdm_ope *txe, int err, int prov_errno)
case EFA_RDM_TXE_REQ:
break;
case EFA_RDM_OPE_SEND:
if (!(txe->internal_flags & EFA_RDM_TXE_RECEIPT_RECEIVED))
if (!(txe->internal_flags & EFA_RDM_TXE_RESPONSE_RECEIVED))
dlist_remove(&txe->entry);
break;
case EFA_RDM_OPE_ERR:
Expand Down Expand Up @@ -982,7 +982,6 @@ void efa_rdm_txe_report_completion(struct efa_rdm_ope *txe)
txe->peer->conn->fi_addr, txe->tx_id, txe->msg_id,
txe->cq_entry.tag, txe->total_len);


efa_rdm_tracepoint(send_end,
txe->msg_id, (size_t) txe->cq_entry.op_context,
txe->total_len, txe->cq_entry.tag, txe->peer->conn->fi_addr);
Expand Down Expand Up @@ -1198,7 +1197,18 @@ void efa_rdm_ope_handle_recv_completed(struct efa_rdm_ope *ope)
}

if (ope->type == EFA_RDM_TXE) {
efa_rdm_txe_release(ope);
/*
* This can only happen for emulated read protocols
* where we use TX entry to receive data from the read.
* target. When the recv completed, the RTR
* send completion may not have
* arrived yet. Defer the release until the RTR send
* completion arrives (efa_outstanding_tx_ops == 0)
* to avoid use-after-free of the tx entry.
*/
ope->internal_flags |= EFA_RDM_TXE_RESPONSE_RECEIVED;;
if (efa_rdm_txe_with_resp_ready_for_release(ope))
efa_rdm_txe_release(ope);
} else {
assert(ope->type == EFA_RDM_RXE);
efa_rdm_rxe_release(ope);
Expand Down
34 changes: 19 additions & 15 deletions prov/efa/src/rdm/efa_rdm_ope.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,19 @@ void efa_rdm_rxe_release_internal(struct efa_rdm_ope *rxe);
#define EFA_RDM_OPE_INTERNAL BIT_ULL(15)

/**
* @brief flag to indicate that a DC txe has received its receipt packet
* @brief flag to indicate that a txe has received its response/ack
*
* This flag is used to track when a delivery complete operation has
* received acknowledgment from the receiver, preventing premature
* completion before all TX operations finish.
* This applies to protocols where the txe sends a request packet and
* needs to wait for both a response AND all TX send completions before
* the txe can be released or completed:
* - Delivery complete: REQ sent, RECEIPT received
* - Emulated read: RTR sent, data received via READRSP/CTSDATA
* - Fetch/compare atomics: FETCH_RTA/COMPARE_RTA sent, ATOMRSP received
*
* The txe cannot be released/completed until both the response has been
* received AND all outstanding TX ops have completed.
*/
#define EFA_RDM_TXE_RECEIPT_RECEIVED BIT_ULL(16)
#define EFA_RDM_TXE_RESPONSE_RECEIVED BIT_ULL(16)

#define EFA_RDM_OPE_QUEUED_FLAGS (EFA_RDM_OPE_QUEUED_RNR | EFA_RDM_OPE_QUEUED_CTRL | EFA_RDM_OPE_QUEUED_READ | EFA_RDM_OPE_QUEUED_BEFORE_HANDSHAKE)

Expand All @@ -317,24 +323,22 @@ void efa_rdm_ope_handle_recv_completed(struct efa_rdm_ope *ope);
void efa_rdm_ope_handle_send_completed(struct efa_rdm_ope *ope);

/**
* @brief Check if a delivery complete (DC) TXE is ready for release
* @brief Check if a txe that received its response/ack is ready for release
*
* @details
* For DC packets, this function prevents use-after-free race conditions by
* ensuring the TXE is only released when both conditions are met:
* 1. All TX operations have completed (efa_outstanding_tx_ops == 0)
* 2. Receipt packet has been received (EFA_RDM_TXE_RECEIPT_RECEIVED flag set)
*
* This dual-condition check ensures proper synchronization between send
* completions and receipt acknowledgments in the delivery complete protocol.
* In protocols where the txe sends a request and receives a response
* (emulated read, fetch/compare atomics), the txe can only be released
* when both:
* 1. The response has been received (EFA_RDM_TXE_RESPONSE_RECEIVED flag set)
* 2. All TX ops have completed (efa_outstanding_tx_ops == 0)
*
* @param[in] txe TX operation entry to check
* @return true if TXE is ready for release, false otherwise
*/
static inline bool efa_rdm_txe_dc_ready_for_release(struct efa_rdm_ope *txe)
static inline bool efa_rdm_txe_with_resp_ready_for_release(struct efa_rdm_ope *txe)
{
return (txe->efa_outstanding_tx_ops == 0) &&
(txe->internal_flags & EFA_RDM_TXE_RECEIPT_RECEIVED);
(txe->internal_flags & EFA_RDM_TXE_RESPONSE_RECEIVED);
}

int efa_rdm_ope_prepare_to_post_read(struct efa_rdm_ope *ope);
Expand Down
30 changes: 17 additions & 13 deletions prov/efa/src/rdm/efa_rdm_pke_cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -621,21 +621,25 @@ void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
case EFA_RDM_LONGREAD_RTW_PKT:
/* nothing to do when long rtw send completes*/
break;
case EFA_RDM_SHORT_RTR_PKT:
case EFA_RDM_LONGCTS_RTR_PKT:
/* Unlike other protocol, for emulated read, txe
* is released in efa_rdm_ope_handle_recv_completed().
* Therefore there is nothing to be done here.
*/
break;
case EFA_RDM_WRITE_RTA_PKT:
efa_rdm_pke_handle_write_rta_send_completion(pkt_entry);
break;
case EFA_RDM_SHORT_RTR_PKT:
case EFA_RDM_LONGCTS_RTR_PKT:
/* For emulated read, txe is normally released in
* efa_rdm_ope_handle_recv_completed(). However, if recv
* completed before this RTR send completion arrived, the
* release was deferred. Release the txe now.
*/
case EFA_RDM_FETCH_RTA_PKT:
/* no action to be taken here */
break;
case EFA_RDM_COMPARE_RTA_PKT:
/* no action to be taken here */
/* For fetch/compare atomics, txe is normally released in
* efa_rdm_pke_handle_atomrsp_recv(). However, if the ATOMRSP
* arrived before this send completion, the release was deferred.
*/
assert(pkt_entry->ope);
if (efa_rdm_txe_with_resp_ready_for_release(pkt_entry->ope))
efa_rdm_txe_release(pkt_entry->ope);
break;
case EFA_RDM_DC_EAGER_MSGRTM_PKT:
case EFA_RDM_DC_EAGER_TAGRTM_PKT:
Expand All @@ -650,11 +654,11 @@ void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
* instead of bytes_acked to avoid issues with unset payload_size.
* Note: efa_rdm_ep_record_tx_op_completed() above decrements efa_outstanding_tx_ops,
* so this check must come after that call.
* Only release TXE when both TX ops complete and receipt is received.
* Only complete the TXE when both TX ops complete and receipt is received.
*/
assert(pkt_entry->ope);
if (efa_rdm_txe_dc_ready_for_release(pkt_entry->ope))
efa_rdm_txe_release(pkt_entry->ope);
if (efa_rdm_txe_with_resp_ready_for_release(pkt_entry->ope))
efa_rdm_ope_handle_send_completed(pkt_entry->ope);
break;
case EFA_RDM_READ_NACK_PKT:
/* no action needed for NACK packet */
Expand Down
33 changes: 19 additions & 14 deletions prov/efa/src/rdm/efa_rdm_pke_nonreq.c
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,16 @@ void efa_rdm_pke_handle_ctsdata_send_completion(struct efa_rdm_pke *pkt_entry)
{
struct efa_rdm_ope *ope;

/* if this DATA packet is used by a DC protocol, the completion
* was (or will be) written when the receipt packet was received.
* The txe may have already been released. So nothing
* to do (or can be done) here.
/* if this DATA packet is used by a DC protocol, the tx entry should
* be only completed when both all TX ops are done and the receipt
* has been received.
*/
if (pkt_entry->flags & EFA_RDM_PKE_DC_LONGCTS_DATA)
if (pkt_entry->flags & EFA_RDM_PKE_DC_LONGCTS_DATA) {
assert(pkt_entry->ope);
if (efa_rdm_txe_with_resp_ready_for_release(pkt_entry->ope))
efa_rdm_ope_handle_send_completed(pkt_entry->ope);
return;
}

ope = pkt_entry->ope;
ope->bytes_acked += efa_rdm_pke_get_ctsdata_hdr(pkt_entry)->seg_length;
Expand Down Expand Up @@ -783,20 +786,17 @@ void efa_rdm_pke_handle_receipt_recv(struct efa_rdm_pke *pkt_entry)
return;
}

/* Write send completion immediately to preserve DC semantics */
efa_rdm_txe_report_completion(txe);

/* Remove from ope_longcts_send_list since operation is complete */
/* Remove from ope_longcts_send_list since all the data has been delivered */
if (txe->state == EFA_RDM_OPE_SEND) {
dlist_remove(&txe->entry);
}

/* Set receipt received flag for DC operations */
txe->internal_flags |= EFA_RDM_TXE_RECEIPT_RECEIVED;
txe->internal_flags |= EFA_RDM_TXE_RESPONSE_RECEIVED;

/* Only release txe if both conditions are met */
if (efa_rdm_txe_dc_ready_for_release(txe))
efa_rdm_txe_release(txe);
/* Only complete txe if both conditions are met */
if (efa_rdm_txe_with_resp_ready_for_release(txe))
efa_rdm_ope_handle_send_completed(txe);

efa_rdm_pke_release_rx(pkt_entry);
}
Expand Down Expand Up @@ -866,6 +866,11 @@ void efa_rdm_pke_handle_atomrsp_recv(struct efa_rdm_pke *pkt_entry)
else
efa_cntr_report_tx_completion(&pkt_entry->ep->base_ep.util_ep, txe->cq_entry.flags);

efa_rdm_txe_release(txe);
/* Defer txe release until the FETCH_RTA/COMPARE_RTA send completion
* arrives to avoid use-after-free if the buffer pool slot is reused.
*/
txe->internal_flags |= EFA_RDM_TXE_RESPONSE_RECEIVED;
if (efa_rdm_txe_with_resp_ready_for_release(txe))
efa_rdm_txe_release(txe);
efa_rdm_pke_release_rx(pkt_entry);
}
1 change: 1 addition & 0 deletions prov/efa/test/efa_unit_test_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ void test_rdm_cq_handshake_bad_send_status_impl(struct efa_resource **state, int
txe = efa_unit_test_alloc_txe(resource, ofi_op_msg);
assert_non_null(txe);
txe->internal_flags |= EFA_RDM_OPE_INTERNAL;
txe->efa_outstanding_tx_ops = 1;
pkt_entry->ope = txe;
pkt_entry->peer = peer;

Expand Down
1 change: 1 addition & 0 deletions prov/efa/test/efa_unit_test_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -2109,6 +2109,7 @@ void test_efa_rdm_ep_outstanding_tx_ops_decremented_with_error_completion(struct
txe = efa_unit_test_alloc_txe(resource, ofi_op_msg);
assert_non_null(txe);
txe->internal_flags |= EFA_RDM_OPE_INTERNAL;
txe->efa_outstanding_tx_ops = 1;
pkt_entry->ope = txe;
pkt_entry->peer = peer;

Expand Down
Loading
Loading