Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions libfabric.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,8 @@
<ClCompile Include="prov\efa\src\rdm\efa_rdm_pke_nonreq.c" />
<ClCompile Include="prov\efa\src\rdm\efa_rdm_pke_req.c" />
<ClCompile Include="prov\efa\src\rdm\efa_rdm_pke_rtm.c" />
<ClCompile Include="prov\efa\src\rdm\efa_rdm_proto.c" />
<ClCompile Include="prov\efa\src\rdm\efa_rdm_proto_eager.c" />
<ClCompile Include="prov\efa\src\rdm\efa_rdm_pke_rta.c" />
<ClCompile Include="prov\efa\src\rdm\efa_rdm_pke_rtw.c" />
<ClCompile Include="prov\efa\src\rdm\efa_rdm_pke_rtr.c" />
Expand Down Expand Up @@ -1025,6 +1027,8 @@
<ClInclude Include="prov\efa\src\rdm\efa_rdm_cntr.h" />
<ClInclude Include="prov\efa\src\rdm\efa_rdm_pke_utils.h" />
<ClInclude Include="prov\efa\src\rdm\efa_rdm_pke_nonreq.h" />
<ClInclude Include="prov\efa\src\rdm\efa_rdm_proto.h" />
<ClInclude Include="prov\efa\src\rdm\efa_rdm_proto_eager.h" />
<ClInclude Include="prov\efa\src\rdm\efa_rdm_atomic.h" />
<ClInclude Include="prov\efa\src\rdm\efa_rdm_msg.h" />
<ClInclude Include="prov\efa\src\rdm\efa_rdm_pke.h" />
Expand Down
11 changes: 8 additions & 3 deletions prov/efa/Makefile.include
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ _efa_files = \
prov/efa/src/rdm/efa_rdm_tracepoint_def.c \
prov/efa/src/rdm/efa_rdm_srx.c \
prov/efa/src/rdm/efa_rdm_util.c \
prov/efa/src/rdm/efa_rdm_mr.c
prov/efa/src/rdm/efa_rdm_mr.c \
prov/efa/src/rdm/efa_rdm_proto.c \
prov/efa/src/rdm/efa_rdm_proto_eager.c

if ENABLE_EFA_UNIT_TEST
_efa_files += prov/efa/test/efa_unit_test_data_path_ops.c
Expand Down Expand Up @@ -142,7 +144,9 @@ _efa_headers = \
prov/efa/src/rdm/efa_rdm_tracepoint.h \
prov/efa/src/rdm/efa_rdm_srx.h \
prov/efa/src/rdm/efa_rdm_util.h \
prov/efa/src/rdm/efa_rdm_mr.h
prov/efa/src/rdm/efa_rdm_mr.h \
prov/efa/src/rdm/efa_rdm_proto.h \
prov/efa/src/rdm/efa_rdm_proto_eager.h

if HAVE_LTTNG
efa_LDFLAGS += -llttng-ust
Expand Down Expand Up @@ -176,7 +180,8 @@ nodist_prov_efa_test_efa_unit_test_SOURCES = \
prov/efa/test/efa_unit_test_msg.c \
prov/efa/test/efa_unit_test_rma.c \
prov/efa/test/efa_unit_test_rdm_rma.c \
prov/efa/test/efa_unit_test_data_path_direct.c
prov/efa/test/efa_unit_test_data_path_direct.c \
prov/efa/test/efa_unit_test_proto.c


efa_CPPFLAGS += -I$(top_srcdir)/include -I$(top_srcdir)/prov/efa/test $(cmocka_CPPFLAGS)
Expand Down
32 changes: 30 additions & 2 deletions prov/efa/src/rdm/efa_rdm_cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -804,8 +804,36 @@ enum ibv_wc_status efa_rdm_cq_process_wc(struct efa_ibv_cq *cq, struct efa_rdm_e
#if ENABLE_DEBUG
ep->send_comps++;
#endif
efa_rdm_pke_handle_send_completion(pkt_entry);
efa_rdm_cq_increment_pkt_entry_gen(pkt_entry);
if (pkt_entry->handle_pke) {
efa_rdm_ep_record_tx_op_completed(pkt_entry->ep,
pkt_entry);
/*
* For a send completion, pkt_entry->peer can be
* NULL in 3 situations:
* 1. the pkt_entry is used for a local read
* operation
* 2. a new peer with same gid+qpn was inserted
* to av, thus the peer was removed from AV.
* 3. application removed the peer's address
* from av. In 1, we should proceed. For 2 and
* 3, the send completion should be ignored.
*/
if (!pkt_entry->peer &&
!(pkt_entry->flags &
EFA_RDM_PKE_LOCAL_READ)) {
EFA_WARN(
FI_LOG_CQ,
"ignoring send completion of a "
"packet to a removed peer.\n");
efa_rdm_pke_release_tx(pkt_entry);
} else {
pkt_entry->handle_pke(pkt_entry);
}
efa_rdm_cq_increment_pkt_entry_gen(pkt_entry);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will it be better to make efa_rdm_cq_increment_pkt_entry_gen part of the callback .. ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered that. The callback is part of the protocol. The gen counter is used by the progress engine to keep track of duplicated completions. So I felt that it's better to leave the gen counter increment here

} else {
efa_rdm_pke_handle_send_completion(pkt_entry);
efa_rdm_cq_increment_pkt_entry_gen(pkt_entry);
}
break;
case IBV_WC_RECV:
/* efa_rdm_cq_handle_recv_completion does additional work to determine the source
Expand Down
77 changes: 75 additions & 2 deletions prov/efa/src/rdm/efa_rdm_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include "efa_rdm_pke_utils.h"
#include "efa_rdm_pke_req.h"

#include "efa_mr.h"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we only active the new code in this commit, should the earlier assertion in the switch cases added only in this commit?

#include "efa_rdm_proto.h"
#include "efa_rdm_proto_eager.h"
#include "efa_rdm_tracepoint.h"

/**
Expand Down Expand Up @@ -102,14 +105,42 @@ int efa_rdm_msg_select_rtm(struct efa_rdm_ep *efa_rdm_ep, struct efa_rdm_ope *tx
}

/**
* @brief post RTM packet(s) for a send operation
* @brief Post an already-filled TXE using the new protocol path.
*
* Used by the retry path after handshake completes and by the normal
* send path. The TXE must already be filled by efa_rdm_proto_txe_fill.
*/
ssize_t efa_rdm_msg_post_rtm_proto(struct efa_rdm_ep *ep,
struct efa_rdm_ope *txe,
struct efa_rdm_proto *proto)
{
ssize_t err;
uint64_t pke_send_flags = 0;

err = proto->construct_tx_pkes(
ep, txe->peer, NULL, txe->op, txe->tag,
txe->fi_flags, txe->internal_flags, txe);
if (err)
return err;

err = efa_rdm_pke_sendv(ep->send_pkt_entry_vec,
ep->send_pkt_entry_vec_size,
pke_send_flags);
if (err)
return err;

proto->handle_tx_pkes_posted(ep, txe);
return FI_SUCCESS;
}

/**
* @brief Post a RTM packet for a TX entry using the old code path.
*
* @param[in,out] ep endpoint
* @param[in,out] txe information of the send operation.
* @retval 0 if packet(s) was posted successfully.
* @retval -FI_ENOSUPP if the send operation requires an extra feature,
* which peer does not support.
* @retval -FI_EAGAIN for temporary out of resources for send
*/
ssize_t efa_rdm_msg_post_rtm(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe)
{
Expand Down Expand Up @@ -168,6 +199,7 @@ ssize_t efa_rdm_msg_generic_send(struct efa_rdm_ep *ep, const struct fi_msg *msg
struct efa_rdm_ope *txe;
struct efa_rdm_peer *peer;
size_t available_tx_pkts;
struct efa_rdm_proto *proto;

efa_rdm_tracepoint(send_begin_msg_context,
(size_t) msg->context, (size_t) msg->addr);
Expand Down Expand Up @@ -196,6 +228,47 @@ ssize_t efa_rdm_msg_generic_send(struct efa_rdm_ep *ep, const struct fi_msg *msg
goto out;
}

/* First try to use the refactored code path */
err = efa_rdm_proto_select_send_protocol(ep, peer, msg, op, fi_flags, txe,
&proto);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please split the new and old paths to two different functions

if (err)
goto out;

/* If a protocol is found, use it. Otherwise, fall back to the old code
* path */
if (proto) {
efa_rdm_proto_txe_fill(txe, ep, peer, msg, op, tag, fi_flags,
internal_flags);
txe->msg_id = peer->next_msg_id++;

/*
* For backwards compatibility: if the peer may have zero-copy
* receive enabled, we must complete handshake before sending so
* we can discover the peer's user_recv_qp and route packets
* accordingly.
*/
if (ep->peer_may_have_zcpy_rx &&
!(peer->flags & EFA_RDM_PEER_HANDSHAKE_RECEIVED)) {
err = efa_rdm_ep_enforce_handshake_for_txe(ep, txe);
if (err) {
efa_rdm_txe_release(txe);
peer->next_msg_id--;
}
goto out;
}

err = efa_rdm_msg_post_rtm_proto(ep, txe, proto);
if (err) {
efa_rdm_txe_release(txe);
peer->next_msg_id--;
goto out;
}

peer->flags |= EFA_RDM_PEER_REQ_SENT;
goto out;
}

/* Fallback to the old code path */
efa_rdm_txe_construct(txe, ep, peer, msg, op, fi_flags, internal_flags);
if (op == ofi_op_tagged) {
txe->cq_entry.tag = tag;
Expand Down
27 changes: 27 additions & 0 deletions prov/efa/src/rdm/efa_rdm_msg.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */
/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */

struct efa_rdm_proto;
static inline
void efa_rdm_msg_construct(struct fi_msg *msg, const struct iovec *iov, void **desc,
size_t count, fi_addr_t addr, void *context, uint64_t data)
Expand Down Expand Up @@ -29,6 +30,32 @@ struct efa_rdm_ope *efa_rdm_msg_split_rxe(struct efa_rdm_ep *ep,
struct efa_rdm_ope *consumer_entry,
struct efa_rdm_pke *pkt_entry);
ssize_t efa_rdm_msg_post_rtm(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe);

/**
* @brief Compute the effective fi_flags for a TX operation.
*
* Merges per-operation flags with the endpoint's tx_op_flags, respecting
* the tx_msg_flags setting for FI_COMPLETION.
*
* @param[in] ep RDM endpoint
* @param[in] fi_flags Per-operation flags from the application
* @return Merged flags to use for the TX operation
*/
static inline
uint64_t efa_rdm_msg_get_tx_flags(struct efa_rdm_ep *ep, uint64_t fi_flags)
{
uint64_t tx_op_flags;

assert(ep->base_ep.util_ep.tx_msg_flags == 0 ||
ep->base_ep.util_ep.tx_msg_flags == FI_COMPLETION);
tx_op_flags = ep->base_ep.util_ep.tx_op_flags;
if (ep->base_ep.util_ep.tx_msg_flags == 0)
tx_op_flags &= ~FI_COMPLETION;
return fi_flags | tx_op_flags;
}

ssize_t efa_rdm_msg_post_rtm_proto(struct efa_rdm_ep *ep, struct efa_rdm_ope *txe,
struct efa_rdm_proto *proto);
/*
* The following 2 OP structures are defined in efa_rdm_msg.c and is
* used by #efa_rdm_ep_open()
Expand Down
4 changes: 4 additions & 0 deletions prov/efa/src/rdm/efa_rdm_ope.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ void efa_rdm_txe_construct(struct efa_rdm_ope *txe,
txe->efa_outstanding_tx_ops = 0;
dlist_init(&txe->queued_pkts);

txe->proto = NULL;

memcpy(txe->iov, msg->msg_iov, sizeof(struct iovec) * msg->iov_count);
memset(txe->mr, 0, sizeof(*txe->mr) * msg->iov_count);
if (msg->desc) {
Expand Down Expand Up @@ -1956,6 +1958,8 @@ ssize_t efa_rdm_ope_repost_ope_queued_before_handshake(struct efa_rdm_ope *ope)
switch (ope->op) {
case ofi_op_msg: /* fall through */
case ofi_op_tagged:
if (ope->proto)
return efa_rdm_msg_post_rtm_proto(ope->ep, ope, ope->proto);
return efa_rdm_msg_post_rtm(ope->ep, ope);
case ofi_op_write:
return efa_rdm_rma_post_write(ope->ep, ope);
Expand Down
1 change: 1 addition & 0 deletions prov/efa/src/rdm/efa_rdm_ope.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ struct efa_rdm_ope {

struct efa_rdm_ep *ep;
struct efa_rdm_peer *peer;
struct efa_rdm_proto *proto; /**< protocol used by the refactored send path; NULL in the old code path */

uint32_t tx_id;
uint32_t rx_id;
Expand Down
1 change: 1 addition & 0 deletions prov/efa/src/rdm/efa_rdm_pke.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ struct efa_rdm_pke *efa_rdm_pke_alloc(struct efa_rdm_ep *ep,
pkt_entry->payload_size = 0;
pkt_entry->payload_mr = NULL;
pkt_entry->peer = NULL;
pkt_entry->handle_pke = NULL;

switch (alloc_type) {
case EFA_RDM_PKE_FROM_READ_COPY_POOL:
Expand Down
3 changes: 3 additions & 0 deletions prov/efa/src/rdm/efa_rdm_pke.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ struct efa_rdm_pke {
/**@brief Generation counter. It is incremented every time the packet is posted to rdma-core */
uint8_t gen;

/**@brief Callback function called in TX and RX paths */
void (*handle_pke)(struct efa_rdm_pke *pkt_entry);

#if ENABLE_DEBUG
struct efa_rdm_pke_debug_info_buffer *debug_info; /**< Pointer to debug info buffer */
#endif
Expand Down
29 changes: 9 additions & 20 deletions prov/efa/src/rdm/efa_rdm_pke_cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,8 @@ int efa_rdm_pke_fill_data(struct efa_rdm_pke *pkt_entry,
ret = efa_rdm_pke_init_receipt(pkt_entry, ope);
break;
case EFA_RDM_EAGER_MSGRTM_PKT:
assert(data_offset == 0 && data_size == -1);
ret = efa_rdm_pke_init_eager_msgrtm(pkt_entry, ope);
break;
case EFA_RDM_EAGER_TAGRTM_PKT:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it not an opportunity to united these cases?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This entire function will be removed when the refactor is complete, so I'm not sure it matters

assert(data_offset == 0 && data_size == -1);
ret = efa_rdm_pke_init_eager_tagrtm(pkt_entry, ope);
assert(0 && "Eager protocol moved to refactored code path");
break;
case EFA_RDM_MEDIUM_MSGRTM_PKT:
assert(data_offset >= 0 && data_size > 0);
Expand Down Expand Up @@ -173,12 +169,8 @@ int efa_rdm_pke_fill_data(struct efa_rdm_pke *pkt_entry,
ret = efa_rdm_pke_init_compare_rta(pkt_entry, ope);
break;
case EFA_RDM_DC_EAGER_MSGRTM_PKT:
assert(data_offset == 0 && data_size == -1);
ret = efa_rdm_pke_init_dc_eager_msgrtm(pkt_entry, ope);
break;
case EFA_RDM_DC_EAGER_TAGRTM_PKT:
assert(data_offset == 0 && data_size == -1);
ret = efa_rdm_pke_init_dc_eager_tagrtm(pkt_entry, ope);
assert(0 && "Eager protocol moved to refactored code path");
break;
case EFA_RDM_DC_MEDIUM_MSGRTM_PKT:
assert(data_offset >= 0 && data_size > 0);
Expand Down Expand Up @@ -266,7 +258,7 @@ void efa_rdm_pke_handle_sent(struct efa_rdm_pke *pkt_entry, int pkt_type, struct
break;
case EFA_RDM_EAGER_MSGRTM_PKT:
case EFA_RDM_EAGER_TAGRTM_PKT:
/* nothing to do */
assert(0 && "Eager protocol moved to refactored code path");
break;
case EFA_RDM_MEDIUM_MSGRTM_PKT:
case EFA_RDM_MEDIUM_TAGRTM_PKT:
Expand Down Expand Up @@ -310,8 +302,10 @@ void efa_rdm_pke_handle_sent(struct efa_rdm_pke *pkt_entry, int pkt_type, struct
break;
case EFA_RDM_DC_EAGER_MSGRTM_PKT:
case EFA_RDM_DC_EAGER_TAGRTM_PKT:
assert(0 && "Eager protocol moved to refactored code path");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a packet construction ? seems it should be part of your next commit

break;
case EFA_RDM_DC_EAGER_RTW_PKT:
/* nothing to do for DC EAGER RTM/RTW */
/* nothing to do for DC EAGER RTW */
break;
case EFA_RDM_CTSDATA_PKT:
efa_rdm_pke_handle_ctsdata_sent(pkt_entry);
Expand Down Expand Up @@ -553,13 +547,6 @@ void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
return;
}

/* These pkts are eager pkts withour hdrs */
if (pkt_entry->flags & EFA_RDM_PKE_SEND_TO_USER_RECV_QP) {
efa_rdm_pke_handle_eager_rtm_send_completion(pkt_entry);
efa_rdm_pke_release_tx(pkt_entry);
return;
}

/* Start handling pkts with hdrs */
switch (efa_rdm_pkt_type_of(pkt_entry)) {
case EFA_RDM_HANDSHAKE_PKT:
Expand Down Expand Up @@ -592,7 +579,7 @@ void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
break;
case EFA_RDM_EAGER_MSGRTM_PKT:
case EFA_RDM_EAGER_TAGRTM_PKT:
efa_rdm_pke_handle_eager_rtm_send_completion(pkt_entry);
assert(0 && "Eager protocol moved to refactored code path");
break;
case EFA_RDM_MEDIUM_MSGRTM_PKT:
case EFA_RDM_MEDIUM_TAGRTM_PKT:
Expand Down Expand Up @@ -637,6 +624,8 @@ void efa_rdm_pke_handle_send_completion(struct efa_rdm_pke *pkt_entry)
break;
case EFA_RDM_DC_EAGER_MSGRTM_PKT:
case EFA_RDM_DC_EAGER_TAGRTM_PKT:
assert(0 && "Eager protocol moved to refactored code path");
break;
case EFA_RDM_DC_MEDIUM_MSGRTM_PKT:
case EFA_RDM_DC_MEDIUM_TAGRTM_PKT:
case EFA_RDM_DC_EAGER_RTW_PKT:
Expand Down
Loading
Loading