Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions prov/efa/src/efa.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,6 @@
#define EFA_DEFAULT_INTER_MIN_READ_WRITE_SIZE (65536)
#define EFA_DEFAULT_INTRA_MAX_GDRCOPY_FROM_DEV_SIZE (3072)

/*
* The default memory alignment
*/
#define EFA_RDM_DEFAULT_MEMORY_ALIGNMENT (8)

/*
* The CUDA memory alignment
*/
#define EFA_RDM_CUDA_MEMORY_ALIGNMENT (64)

/*
* The alignment to support in-order aligned ops.
*/
#define EFA_RDM_IN_ORDER_ALIGNMENT (128)

/*
* Set alignment to x86 cache line size.
*/
Expand Down
4 changes: 0 additions & 4 deletions prov/efa/src/rdm/efa_rdm_atomic.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ efa_rdm_atomic_alloc_txe(struct efa_rdm_ep *efa_rdm_ep,
return NULL;
Comment thread
sunkuamzn marked this conversation as resolved.
}

efa_domain_ope_list_lock(efa_rdm_ep_domain(efa_rdm_ep));
dlist_insert_tail(&txe->ep_entry, &efa_rdm_ep->txe_list);
efa_domain_ope_list_unlock(efa_rdm_ep_domain(efa_rdm_ep));

ofi_ioc_to_iov(msg_atomic->msg_iov, iov, msg_atomic->iov_count, datatype_size);
msg.addr = msg_atomic->addr;
msg.msg_iov = iov;
Expand Down
52 changes: 43 additions & 9 deletions prov/efa/src/rdm/efa_rdm_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ struct efa_rdm_ep_queued_copy {

#define EFA_RDM_MAX_QUEUED_COPY (8)

/*
* The default memory alignment
*/
#define EFA_RDM_EP_DEFAULT_MEMORY_ALIGNMENT (8)

/*
* The CUDA memory alignment
*/
#define EFA_RDM_EP_CUDA_MEMORY_ALIGNMENT (64)

/*
* The alignment to support in-order aligned ops.
*/
#define EFA_RDM_EP_IN_ORDER_ALIGNMENT (128)

/**
* Max number of opes queued before handshake is made
* with their peers. This cnt is per EP.
Expand Down Expand Up @@ -195,7 +210,8 @@ struct efa_rdm_ep {
struct efa_rdm_pke **pke_vec;
/* Work arrays for efa_rdm_ope_post_send to avoid stack allocation */
struct efa_rdm_pke **send_pkt_entry_vec;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be cleaner if we create a send_pkt_entry struct to wrap these 3 together?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this. It will add another layer of redirection and I'm not sure if it helps. I did not add a new struct.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having an nested struct does not cause an extra layer of indirection. It does however help in organizing this giant and messy struct

int *send_pkt_entry_size_vec;
size_t *send_pkt_entry_vec_data_sizes;
size_t send_pkt_entry_vec_size;
Comment thread
sunkuamzn marked this conversation as resolved.
struct dlist_entry entry;
/* the count of opes queued before handshake is made with their peers */
size_t ope_queued_before_handshake_cnt;
Expand All @@ -220,13 +236,6 @@ struct efa_rdm_peer *efa_rdm_ep_get_peer_explicit(struct efa_rdm_ep *ep, fi_addr
int32_t efa_rdm_ep_get_peer_ahn(struct efa_rdm_ep *ep, fi_addr_t addr);
struct efa_rdm_peer *efa_rdm_ep_get_peer_implicit(struct efa_rdm_ep *ep, fi_addr_t addr);

struct efa_rdm_ope *efa_rdm_ep_alloc_txe(struct efa_rdm_ep *efa_rdm_ep,
struct efa_rdm_peer *peer,
const struct fi_msg *msg,
uint32_t op,
uint64_t tag,
uint64_t flags);

struct efa_rdm_ope *efa_rdm_ep_alloc_rxe(struct efa_rdm_ep *ep,
struct efa_rdm_peer *peer, uint32_t op);

Expand All @@ -253,7 +262,26 @@ void efa_rdm_ep_queue_rnr_pkt(struct efa_rdm_ep *ep, struct efa_rdm_pke *pkt_ent
ssize_t efa_rdm_ep_post_queued_pkts(struct efa_rdm_ep *ep,
struct dlist_entry *pkts);

size_t efa_rdm_ep_get_memory_alignment(struct efa_rdm_ep *ep, enum fi_hmem_iface iface);
/**
* @brief Get memory alignment for given ep and hmem iface
*
* @param ep efa rdm ep
* @param iface hmem iface
* @return size_t the memory alignment
*/
static inline size_t efa_rdm_ep_get_memory_alignment(struct efa_rdm_ep *ep,
enum fi_hmem_iface iface)
{
size_t memory_alignment = EFA_RDM_EP_DEFAULT_MEMORY_ALIGNMENT;

if (ep->sendrecv_in_order_aligned_128_bytes) {
memory_alignment = EFA_RDM_EP_IN_ORDER_ALIGNMENT;
} else if (iface == FI_HMEM_CUDA) {
memory_alignment = EFA_RDM_EP_CUDA_MEMORY_ALIGNMENT;
}

return memory_alignment;
}

static inline
struct efa_domain *efa_rdm_ep_domain(struct efa_rdm_ep *ep)
Expand Down Expand Up @@ -547,4 +575,10 @@ fi_addr_t efa_rdm_ep_get_explicit_shm_fi_addr(struct efa_rdm_ep *ep, fi_addr_t a
conn = efa_av_addr_to_conn(ep->base_ep.av, addr);
return conn ? conn->shm_fi_addr : FI_ADDR_NOTAVAIL;
}

static inline size_t efa_rdm_ep_get_available_tx_pkts(struct efa_rdm_ep *ep)
{
return ep->efa_max_outstanding_tx_ops - ep->efa_outstanding_tx_ops -
ep->efa_rnr_queued_pkt_cnt;
}
#endif
16 changes: 10 additions & 6 deletions prov/efa/src/rdm/efa_rdm_ep_fiops.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ int efa_rdm_ep_create_buffer_pools(struct efa_rdm_ep *ep)
true, /* need memory registration */
efa_env.readcopy_pool_size,
efa_env.readcopy_pool_size, /* max_cnt==chunk_cnt means pool is not allowed to grow */
EFA_RDM_IN_ORDER_ALIGNMENT, /* support in-order aligned send/recv */
EFA_RDM_EP_IN_ORDER_ALIGNMENT, /* support in-order aligned send/recv */
0,
&ep->rx_readcopy_pkt_pool);
if (ret)
Expand Down Expand Up @@ -680,9 +680,13 @@ int efa_rdm_ep_open(struct fid_domain *domain, struct fi_info *info,
goto err_free_pke_vec;
}

efa_rdm_ep->send_pkt_entry_size_vec = calloc(sizeof(int), efa_base_ep_get_tx_pool_size(&efa_rdm_ep->base_ep));
if (!efa_rdm_ep->send_pkt_entry_size_vec) {
EFA_WARN(FI_LOG_EP_CTRL, "cannot alloc memory for efa_rdm_ep->send_pkt_entry_size_vec!\n");
efa_rdm_ep->send_pkt_entry_vec_data_sizes =
calloc(sizeof(size_t),
efa_base_ep_get_tx_pool_size(&efa_rdm_ep->base_ep));
if (!efa_rdm_ep->send_pkt_entry_vec_data_sizes) {
EFA_WARN(FI_LOG_EP_CTRL,
"cannot alloc memory for "
"efa_rdm_ep->send_pkt_entry_vec_data_sizes!\n");
ret = -FI_ENOMEM;
goto err_free_send_pkt_entry_vec;
}
Expand Down Expand Up @@ -1190,8 +1194,8 @@ static int efa_rdm_ep_close(struct fid *fid)
free(efa_rdm_ep->pke_vec);
if (efa_rdm_ep->send_pkt_entry_vec)
free(efa_rdm_ep->send_pkt_entry_vec);
if (efa_rdm_ep->send_pkt_entry_size_vec)
free(efa_rdm_ep->send_pkt_entry_size_vec);
if (efa_rdm_ep->send_pkt_entry_vec_data_sizes)
free(efa_rdm_ep->send_pkt_entry_vec_data_sizes);

ofi_genlock_unlock(&domain->srx_lock);

Expand Down
61 changes: 7 additions & 54 deletions prov/efa/src/rdm/efa_rdm_ep_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,36 +317,6 @@ int efa_rdm_ep_post_user_recv_buf(struct efa_rdm_ep *ep, struct efa_rdm_ope *rxe
return err;
}



/* create a new txe */
struct efa_rdm_ope *efa_rdm_ep_alloc_txe(struct efa_rdm_ep *efa_rdm_ep,
struct efa_rdm_peer *peer,
const struct fi_msg *msg,
uint32_t op,
uint64_t tag,
uint64_t flags)
{
struct efa_rdm_ope *txe;

txe = ofi_buf_alloc(efa_rdm_ep->ope_pool);
if (OFI_UNLIKELY(!txe)) {
EFA_DBG(FI_LOG_EP_CTRL, "TX entries exhausted.\n");
return NULL;
}

efa_rdm_txe_construct(txe, efa_rdm_ep, peer, msg, op, flags);
if (op == ofi_op_tagged) {
txe->cq_entry.tag = tag;
txe->tag = tag;
}

efa_domain_ope_list_lock(efa_rdm_ep_domain(efa_rdm_ep));
dlist_insert_tail(&txe->ep_entry, &efa_rdm_ep->txe_list);
Comment thread
charlesstoll marked this conversation as resolved.
efa_domain_ope_list_unlock(efa_rdm_ep_domain(efa_rdm_ep));
return txe;
}

/**
* @brief record the event that a TX op has been submitted
*
Expand Down Expand Up @@ -674,15 +644,18 @@ static ssize_t efa_rdm_ep_handshake_common(struct efa_rdm_ep *ep, struct efa_rdm

msg.addr = peer->conn->fi_addr;

txe = efa_rdm_ep_alloc_txe(ep, peer, &msg, ofi_op_write, 0, 0);

txe = ofi_buf_alloc(ep->ope_pool);
if (OFI_UNLIKELY(!txe)) {
EFA_WARN(FI_LOG_EP_CTRL, "TX entries exhausted.\n");
return -FI_EAGAIN;
}

/* efa_rdm_ep_alloc_txe() joins ep->base_ep.util_ep.tx_op_flags and passed in flags,
* reset to desired flags (remove things like FI_DELIVERY_COMPLETE, and FI_COMPLETION)
efa_rdm_txe_construct(txe, ep, peer, &msg, ofi_op_write, 0);

/*
* efa_rdm_txe_construct() joins ep->base_ep.util_ep.tx_op_flags and
* passed in flags, reset to desired flags (remove things like
* FI_DELIVERY_COMPLETE, and FI_COMPLETION)
*/
txe->fi_flags = EFA_RDM_TXE_NO_COMPLETION | EFA_RDM_TXE_NO_COUNTER;
txe->internal_flags |= EFA_RDM_OPE_INTERNAL;
Expand Down Expand Up @@ -1068,26 +1041,6 @@ void efa_rdm_ep_post_internal_rx_pkts(struct efa_rdm_ep *ep)
efa_base_ep_write_eq_error(&ep->base_ep, err, FI_EFA_ERR_INTERNAL_RX_BUF_POST);
}

/**
* @brief Get memory alignment for given ep and hmem iface
*
* @param ep efa rdm ep
* @param iface hmem iface
* @return size_t the memory alignment
*/
size_t efa_rdm_ep_get_memory_alignment(struct efa_rdm_ep *ep, enum fi_hmem_iface iface)
{
size_t memory_alignment = EFA_RDM_DEFAULT_MEMORY_ALIGNMENT;

if (ep->sendrecv_in_order_aligned_128_bytes) {
memory_alignment = EFA_RDM_IN_ORDER_ALIGNMENT;
} else if (iface == FI_HMEM_CUDA) {
memory_alignment = EFA_RDM_CUDA_MEMORY_ALIGNMENT;
}

return memory_alignment;
}

/**
* @brief Enforce a handshake to made for given txe.
* It will trigger a handshake with peer and choose to
Expand Down
16 changes: 15 additions & 1 deletion prov/efa/src/rdm/efa_rdm_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ ssize_t efa_rdm_msg_generic_send(struct efa_rdm_ep *ep, const struct fi_msg *msg
ssize_t err;
struct efa_rdm_ope *txe;
struct efa_rdm_peer *peer;
size_t available_tx_pkts;

efa_rdm_tracepoint(send_begin_msg_context,
(size_t) msg->context, (size_t) msg->addr);
Expand All @@ -177,12 +178,25 @@ ssize_t efa_rdm_msg_generic_send(struct efa_rdm_ep *ep, const struct fi_msg *msg
goto out;
}

txe = efa_rdm_ep_alloc_txe(ep, peer, msg, op, tag, flags);
// Handle case when there are no TX packets available
available_tx_pkts = efa_rdm_ep_get_available_tx_pkts(ep);
if (OFI_UNLIKELY(available_tx_pkts == 0)) {
err = -FI_EAGAIN;
goto out;
}

txe = ofi_buf_alloc(ep->ope_pool);
if (OFI_UNLIKELY(!txe)) {
err = -FI_EAGAIN;
goto out;
}

efa_rdm_txe_construct(txe, ep, peer, msg, op, flags);
if (op == ofi_op_tagged) {
txe->cq_entry.tag = tag;
txe->tag = tag;
}

assert(txe->op == ofi_op_msg || txe->op == ofi_op_tagged);

txe->msg_id = peer->next_msg_id++;
Expand Down
Loading
Loading