Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions prov/efa/Makefile.include
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ _efa_headers = \
prov/efa/src/efa_cq.h \
prov/efa/src/efa_cntr.h \
prov/efa/src/efa_base_ep.h \
prov/efa/src/efa_direct_ep.h \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should have been part of your first commit

prov/efa/src/efa_direct_ope.h \
prov/efa/src/efa_tp_def.h \
prov/efa/src/efa_tp.h \
Expand Down
3 changes: 0 additions & 3 deletions prov/efa/src/efa_base_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ struct efa_base_ep {

/* entry for efa_domain->base_ep_list */
struct dlist_entry base_ep_entry;
/* Only used by EFA direct */
struct ofi_bufpool *efa_direct_ope_pool; /**< pool for efa_direct_ope */
struct dlist_entry efa_direct_ope_list; /**< list of outstanding ops */
};

int efa_base_ep_bind_av(struct efa_base_ep *base_ep, struct efa_av *av);
Expand Down
25 changes: 25 additions & 0 deletions prov/efa/src/efa_direct_ep.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */
/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */

#ifndef EFA_DIRECT_EP_H
#define EFA_DIRECT_EP_H

#include "efa_base_ep.h"

/**
* @brief EFA direct endpoint structure
*
* Wraps efa_base_ep as first member (for castability) and adds
* fields that are only used by the efa-direct path.
*/
struct efa_direct_ep {
struct efa_base_ep base_ep;

struct ofi_bufpool *ope_pool; /**< pool for efa_direct_ope */
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a comment here that we make the ope_pool/ope_list additional for efa_direct_ep because of a need to track the outstanding operations that still reference MRs. This should be a temporary addition and finally removed or moved to the efa_base_ep when we have a unified approach to make it cover both efa and efa-direct

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it more, I think we should keep ope_pool and ope_list in the base_ep and make efa_rdm_ep use them. efa_rdm_ep also have a ope_pool which is the same type. efa_rdm_ep has separate txe_list and rxe_list but they are only used for final cleanup purpose. There is no reason we cannot combine them into a single ope_list and scan it once and clean up based on the ope type accordingly

struct dlist_entry ope_list; /**< list of outstanding ops */
};

static_assert(offsetof(struct efa_direct_ep, base_ep) == 0,
"efa_base_ep must be first member of efa_direct_ep for container_of safety");

#endif /* EFA_DIRECT_EP_H */
53 changes: 30 additions & 23 deletions prov/efa/src/efa_direct_ope.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@
#include "efa.h"
#include "efa_direct_ope.h"

int efa_direct_ope_pool_create(struct efa_base_ep *base_ep)
int efa_direct_ope_pool_create(struct efa_direct_ep *ep)
{
int ret;

if (!efa_env.track_mr) {
base_ep->efa_direct_ope_pool = NULL;
ep->ope_pool = NULL;
return 0;
}

dlist_init(&base_ep->efa_direct_ope_list);
dlist_init(&ep->ope_list);

ret = ofi_bufpool_create(&base_ep->efa_direct_ope_pool,
ret = ofi_bufpool_create(&ep->ope_pool,
sizeof(struct efa_direct_ope),
EFA_RDM_BUFPOOL_ALIGNMENT,
base_ep->info->tx_attr->size + base_ep->info->rx_attr->size,
base_ep->info->tx_attr->size + base_ep->info->rx_attr->size,
ep->base_ep.info->tx_attr->size + ep->base_ep.info->rx_attr->size,
ep->base_ep.info->tx_attr->size + ep->base_ep.info->rx_attr->size,
OFI_BUFPOOL_NO_TRACK);
if (ret) {
EFA_WARN(FI_LOG_EP_CTRL,
Expand All @@ -36,48 +36,49 @@ int efa_direct_ope_pool_create(struct efa_base_ep *base_ep)
return ret;
}

ret = ofi_bufpool_grow(base_ep->efa_direct_ope_pool);
ret = ofi_bufpool_grow(ep->ope_pool);
if (ret) {
ofi_bufpool_destroy(base_ep->efa_direct_ope_pool);
base_ep->efa_direct_ope_pool = NULL;
ofi_bufpool_destroy(ep->ope_pool);
ep->ope_pool = NULL;
return ret;
}

EFA_INFO(FI_LOG_EP_CTRL, "ep %p: Created EFA direct op entry pool with size %zu\n",
base_ep, base_ep->info->tx_attr->size + base_ep->info->rx_attr->size);
ep, ep->base_ep.info->tx_attr->size + ep->base_ep.info->rx_attr->size);

return 0;
}

void efa_direct_ope_pool_destroy(struct efa_base_ep *base_ep)
void efa_direct_ope_pool_destroy(struct efa_direct_ep *ep)
{
struct efa_direct_ope *direct_ope;
struct dlist_entry *tmp;

if (!base_ep->efa_direct_ope_pool)
if (!ep->ope_pool)
return;

ofi_genlock_lock(&base_ep->domain->util_domain.lock);
if (!dlist_empty(&base_ep->efa_direct_ope_list)) {
dlist_foreach_container_safe(&base_ep->efa_direct_ope_list,
ofi_genlock_lock(&ep->base_ep.domain->util_domain.lock);
if (!dlist_empty(&ep->ope_list)) {
dlist_foreach_container_safe(&ep->ope_list,
struct efa_direct_ope,
direct_ope, entry, tmp) {
dlist_remove(&direct_ope->entry);
ofi_buf_free(direct_ope);
}
}
ofi_genlock_unlock(&base_ep->domain->util_domain.lock);
ofi_genlock_unlock(&ep->base_ep.domain->util_domain.lock);

EFA_INFO(FI_LOG_EP_CTRL, "ep %p: Destroying EFA direct op entry pool\n", base_ep);
ofi_bufpool_destroy(base_ep->efa_direct_ope_pool);
base_ep->efa_direct_ope_pool = NULL;
EFA_INFO(FI_LOG_EP_CTRL, "ep %p: Destroying EFA direct op entry pool\n", ep);
ofi_bufpool_destroy(ep->ope_pool);
ep->ope_pool = NULL;
}

struct efa_direct_ope *efa_direct_ope_alloc(struct efa_base_ep *base_ep,
struct efa_context *context,
const struct fi_msg *msg,
const struct fi_msg_rma *msg_rma)
{
struct efa_direct_ep *ep = container_of(base_ep, struct efa_direct_ep, base_ep);
struct efa_direct_ope *direct_ope;
const struct iovec *msg_iov = msg ? msg->msg_iov : msg_rma->msg_iov;
void **desc = msg ? msg->desc : msg_rma->desc;
Expand All @@ -86,10 +87,10 @@ struct efa_direct_ope *efa_direct_ope_alloc(struct efa_base_ep *base_ep,
uint64_t data = msg ? msg->data : msg_rma->data;
size_t i;

if (!base_ep->efa_direct_ope_pool)
if (!ep->ope_pool)
return NULL;

direct_ope = ofi_buf_alloc(base_ep->efa_direct_ope_pool);
direct_ope = ofi_buf_alloc(ep->ope_pool);
if (OFI_UNLIKELY(!direct_ope)) {
EFA_WARN(FI_LOG_EP_DATA,
"Failed to allocate EFA direct OPE\n");
Expand All @@ -110,7 +111,7 @@ struct efa_direct_ope *efa_direct_ope_alloc(struct efa_base_ep *base_ep,
}

ofi_genlock_lock(&base_ep->domain->util_domain.lock);
dlist_insert_tail(&direct_ope->entry, &base_ep->efa_direct_ope_list);
dlist_insert_tail(&direct_ope->entry, &ep->ope_list);
ofi_genlock_unlock(&base_ep->domain->util_domain.lock);

return direct_ope;
Expand All @@ -119,7 +120,13 @@ struct efa_direct_ope *efa_direct_ope_alloc(struct efa_base_ep *base_ep,
void efa_direct_ope_release(struct efa_base_ep *base_ep,
struct efa_direct_ope *direct_ope)
{
if (!direct_ope || !base_ep || !base_ep->efa_direct_ope_pool)
struct efa_direct_ep *ep;

if (!direct_ope || !base_ep)
return;

ep = container_of(base_ep, struct efa_direct_ep, base_ep);
if (!ep->ope_pool)
return;

ofi_genlock_lock(&base_ep->domain->util_domain.lock);
Expand Down
14 changes: 7 additions & 7 deletions prov/efa/src/efa_direct_ope.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#ifndef EFA_DIRECT_OPE_H
#define EFA_DIRECT_OPE_H

#include "efa_base_ep.h"
#include "efa_direct_ep.h"

/**
* @brief Maximum number of IOVs supported by EFA device
Expand Down Expand Up @@ -32,24 +32,24 @@ struct efa_direct_ope {
* and initializes the outstanding operation lists.
* The pool is only created if FI_EFA_TRACK_MR is enabled.
*
* @param[in,out] base_ep base endpoint
* @param[in,out] ep efa-direct endpoint
* @return 0 on success, negative error code on failure
*/
int efa_direct_ope_pool_create(struct efa_base_ep *base_ep);
int efa_direct_ope_pool_create(struct efa_direct_ep *ep);

/**
* @brief Destroy the EFA direct operation entry pool
*
* Releases any outstanding operations and destroys the pool.
*
* @param[in,out] base_ep base endpoint
* @param[in,out] ep efa-direct endpoint
*/
void efa_direct_ope_pool_destroy(struct efa_base_ep *base_ep);
void efa_direct_ope_pool_destroy(struct efa_direct_ep *ep);

/**
* @brief Allocate and record an operation entry
*
* Acquires efa_domain->util_domain.lock to protect the efa_direct_ope_list.
* Acquires efa_domain->util_domain.lock to protect the ope_list.
* This is the same lock used by efa_mr_close when iterating across all EPs.
*
* @param[in,out] base_ep base endpoint
Expand All @@ -66,7 +66,7 @@ struct efa_direct_ope *efa_direct_ope_alloc(struct efa_base_ep *base_ep,
/**
* @brief Release an operation entry
*
* Acquires efa_domain->util_domain.lock to protect the efa_direct_ope_list.
* Acquires efa_domain->util_domain.lock to protect the ope_list.
* This is the same lock used by efa_mr_close when iterating across all EPs.
*
* @param[in,out] base_ep base endpoint
Expand Down
31 changes: 17 additions & 14 deletions prov/efa/src/efa_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "efa.h"
#include "efa_av.h"
#include "efa_cq.h"
#include "efa_direct_ep.h"

#include <infiniband/efadv.h>

Expand Down Expand Up @@ -203,18 +204,18 @@ static struct fi_ops_ep efa_ep_base_ops = {

static int efa_ep_close(fid_t fid)
{
struct efa_base_ep *ep;
struct efa_direct_ep *ep;
int ret;

ep = container_of(fid, struct efa_base_ep, util_ep.ep_fid.fid);
ep = container_of(fid, struct efa_direct_ep, base_ep.util_ep.ep_fid.fid);

/* We need to free the util_ep first to avoid race conditions
* with other threads progressing the cntr. */
efa_base_ep_close_util_ep(ep);
efa_base_ep_close_util_ep(&ep->base_ep);

efa_base_ep_remove_cntr_ibv_cq_poll_list(ep);
efa_base_ep_remove_cntr_ibv_cq_poll_list(&ep->base_ep);

ret = efa_base_ep_destruct(ep);
ret = efa_base_ep_destruct(&ep->base_ep);
if (ret) {
EFA_WARN(FI_LOG_EP_CTRL, "Unable to close base endpoint\n");
}
Expand Down Expand Up @@ -334,18 +335,20 @@ static int efa_ep_setflags(struct fid_ep *ep_fid, uint64_t flags)

static int efa_ep_enable(struct fid_ep *ep_fid)
{
struct efa_base_ep *ep;
struct efa_direct_ep *ep;
struct efa_base_ep *base_ep;
int err;

ep = container_of(ep_fid, struct efa_base_ep, util_ep.ep_fid);
ep = container_of(ep_fid, struct efa_direct_ep, base_ep.util_ep.ep_fid);
base_ep = &ep->base_ep;

err = efa_base_ep_create_and_enable_qp(ep);
err = efa_base_ep_create_and_enable_qp(base_ep);
if (err)
return err;

err = efa_base_ep_insert_cntr_ibv_cq_poll_list(ep);
err = efa_base_ep_insert_cntr_ibv_cq_poll_list(base_ep);
if (err) {
efa_base_ep_destruct_qp(ep);
efa_base_ep_destruct_qp(base_ep);
return err;
}

Expand Down Expand Up @@ -432,18 +435,18 @@ struct fi_ops_cm efa_ep_cm_ops = {
int efa_ep_open(struct fid_domain *domain_fid, struct fi_info *user_info,
struct fid_ep **ep_fid, void *context)
{
struct efa_base_ep *ep;
struct efa_direct_ep *ep;
int ret;

ep = calloc(1, sizeof(*ep));
if (!ep)
return -FI_ENOMEM;

ret = efa_base_ep_construct(ep, domain_fid, user_info, efa_ep_progress_no_op, context);
ret = efa_base_ep_construct(&ep->base_ep, domain_fid, user_info, efa_ep_progress_no_op, context);
if (ret)
goto err_ep_destroy;

*ep_fid = &ep->util_ep.ep_fid;
*ep_fid = &ep->base_ep.util_ep.ep_fid;
(*ep_fid)->fid.fclass = FI_CLASS_EP;
(*ep_fid)->fid.context = context;
(*ep_fid)->fid.ops = &efa_ep_ops;
Expand All @@ -456,7 +459,7 @@ int efa_ep_open(struct fid_domain *domain_fid, struct fi_info *user_info,
return 0;

err_ep_destroy:
efa_base_ep_destruct(ep);
efa_base_ep_destruct(&ep->base_ep);
if (ep)
free(ep);
return ret;
Expand Down
13 changes: 12 additions & 1 deletion prov/efa/src/efa_mr.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "config.h"
#include <ofi_util.h>
#include "efa.h"
#include "efa_direct_ep.h"
#include "rdm/efa_rdm_ep.h"
#include "rdm/efa_rdm_ope.h"
#if HAVE_CUDA
Expand Down Expand Up @@ -197,9 +198,19 @@ static void efa_mr_close_check_inflight_ope(struct efa_mr *efa_mr)
ofi_genlock_lock(&efa_domain->util_domain.lock);
dlist_foreach_container(&efa_domain->base_ep_list, struct efa_base_ep,
base_ep, base_ep_entry) {
struct efa_direct_ep *ep;
struct efa_direct_ope *direct_ope;

/* Only efa-direct/DGRAM endpoints use struct efa_direct_ep.
* efa-protocol (RDM) endpoints use struct efa_rdm_ep,
* so container_of to efa_direct_ep would be invalid for them.
*/
if (EFA_INFO_TYPE_IS_RDM(base_ep->info))
continue;

ep = container_of(base_ep, struct efa_direct_ep, base_ep);
dlist_foreach_container_safe (
&base_ep->efa_direct_ope_list,
&ep->ope_list,
struct efa_direct_ope, direct_ope,
entry, tmp) {
efa_mr_close_warn_inflight_ope(direct_ope->desc, direct_ope->iov_count, &direct_ope->cq_entry, efa_mr, base_ep);
Expand Down
13 changes: 0 additions & 13 deletions prov/efa/src/efa_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -424,16 +424,3 @@ struct fi_ops_msg efa_msg_ops = {
.inject = efa_ep_msg_inject,
.injectdata = efa_ep_msg_injectdata,
};

struct fi_ops_msg efa_dgram_ep_msg_ops = {
.size = sizeof(struct fi_ops_msg),
.recv = efa_ep_recv,
.recvv = efa_ep_recvv,
.recvmsg = efa_ep_recvmsg,
.send = efa_ep_send,
.sendv = efa_ep_sendv,
.sendmsg = efa_ep_sendmsg,
.senddata = efa_ep_senddata,
.inject = fi_no_msg_inject,
.injectdata = fi_no_msg_injectdata,
};
13 changes: 1 addition & 12 deletions prov/efa/src/efa_rma.c
Original file line number Diff line number Diff line change
Expand Up @@ -442,18 +442,7 @@ static ssize_t efa_rma_inject_writedata(struct fid_ep *ep, const void *buf, size
return err;
}

struct fi_ops_rma efa_dgram_ep_rma_ops = {
.size = sizeof(struct fi_ops_rma),
.read = fi_no_rma_read,
.readv = fi_no_rma_readv,
.readmsg = fi_no_rma_readmsg,
.write = fi_no_rma_write,
.writev = fi_no_rma_writev,
.writemsg = fi_no_rma_writemsg,
.inject = fi_no_rma_inject,
.writedata = fi_no_rma_writedata,
.injectdata = fi_no_rma_injectdata,
};


struct fi_ops_rma efa_rma_ops = {
.size = sizeof(struct fi_ops_rma),
Expand Down
Loading
Loading