Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions prov/efa/src/efa_base_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ struct efa_base_ep {

/* entry for efa_domain->base_ep_list */
struct dlist_entry base_ep_entry;
/* Only used by EFA direct */
struct ofi_bufpool *efa_direct_ope_pool; /**< pool for efa_direct_ope */
struct dlist_entry efa_direct_ope_list; /**< list of outstanding ops */
};

int efa_base_ep_bind_av(struct efa_base_ep *base_ep, struct efa_av *av);
Expand Down
25 changes: 25 additions & 0 deletions prov/efa/src/efa_direct_ep.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */
/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */

#ifndef EFA_DIRECT_EP_H
#define EFA_DIRECT_EP_H

#include "efa_base_ep.h"

/**
* @brief EFA direct endpoint structure
*
* Wraps efa_base_ep as first member (for castability) and adds
* fields that are only used by the efa-direct path.
*/
struct efa_direct_ep {
struct efa_base_ep base_ep;

struct ofi_bufpool *ope_pool; /**< pool for efa_direct_ope */
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a comment here that we make the ope_pool/ope_list additional for efa_direct_ep because of a need to track the outstanding operations that still reference MRs. This should be a temporary addition and finally removed or moved to the efa_base_ep when we have a unified approach to make it cover both efa and efa-direct

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it more, I think we should keep ope_pool and ope_list in the base_ep and make efa_rdm_ep use them. efa_rdm_ep also have a ope_pool which is the same type. efa_rdm_ep has separate txe_list and rxe_list but they are only used for final cleanup purpose. There is no reason we cannot combine them into a single ope_list and scan it once and clean up based on the ope type accordingly

struct dlist_entry ope_list; /**< list of outstanding ops */
};

static_assert(offsetof(struct efa_direct_ep, base_ep) == 0,
"efa_base_ep must be first member of efa_direct_ep for container_of safety");

#endif /* EFA_DIRECT_EP_H */
53 changes: 30 additions & 23 deletions prov/efa/src/efa_direct_ope.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@
#include "efa.h"
#include "efa_direct_ope.h"

int efa_direct_ope_pool_create(struct efa_base_ep *base_ep)
int efa_direct_ope_pool_create(struct efa_direct_ep *ep)
{
int ret;

if (!efa_env.track_mr) {
base_ep->efa_direct_ope_pool = NULL;
ep->ope_pool = NULL;
return 0;
}

dlist_init(&base_ep->efa_direct_ope_list);
dlist_init(&ep->ope_list);

ret = ofi_bufpool_create(&base_ep->efa_direct_ope_pool,
ret = ofi_bufpool_create(&ep->ope_pool,
sizeof(struct efa_direct_ope),
EFA_RDM_BUFPOOL_ALIGNMENT,
base_ep->info->tx_attr->size + base_ep->info->rx_attr->size,
base_ep->info->tx_attr->size + base_ep->info->rx_attr->size,
ep->base_ep.info->tx_attr->size + ep->base_ep.info->rx_attr->size,
ep->base_ep.info->tx_attr->size + ep->base_ep.info->rx_attr->size,
OFI_BUFPOOL_NO_TRACK);
if (ret) {
EFA_WARN(FI_LOG_EP_CTRL,
Expand All @@ -36,48 +36,49 @@ int efa_direct_ope_pool_create(struct efa_base_ep *base_ep)
return ret;
}

ret = ofi_bufpool_grow(base_ep->efa_direct_ope_pool);
ret = ofi_bufpool_grow(ep->ope_pool);
if (ret) {
ofi_bufpool_destroy(base_ep->efa_direct_ope_pool);
base_ep->efa_direct_ope_pool = NULL;
ofi_bufpool_destroy(ep->ope_pool);
ep->ope_pool = NULL;
return ret;
}

EFA_INFO(FI_LOG_EP_CTRL, "ep %p: Created EFA direct op entry pool with size %zu\n",
base_ep, base_ep->info->tx_attr->size + base_ep->info->rx_attr->size);
ep, ep->base_ep.info->tx_attr->size + ep->base_ep.info->rx_attr->size);

return 0;
}

void efa_direct_ope_pool_destroy(struct efa_base_ep *base_ep)
void efa_direct_ope_pool_destroy(struct efa_direct_ep *ep)
{
struct efa_direct_ope *direct_ope;
struct dlist_entry *tmp;

if (!base_ep->efa_direct_ope_pool)
if (!ep->ope_pool)
return;

ofi_genlock_lock(&base_ep->domain->util_domain.lock);
if (!dlist_empty(&base_ep->efa_direct_ope_list)) {
dlist_foreach_container_safe(&base_ep->efa_direct_ope_list,
ofi_genlock_lock(&ep->base_ep.domain->util_domain.lock);
if (!dlist_empty(&ep->ope_list)) {
dlist_foreach_container_safe(&ep->ope_list,
struct efa_direct_ope,
direct_ope, entry, tmp) {
dlist_remove(&direct_ope->entry);
ofi_buf_free(direct_ope);
}
}
ofi_genlock_unlock(&base_ep->domain->util_domain.lock);
ofi_genlock_unlock(&ep->base_ep.domain->util_domain.lock);

EFA_INFO(FI_LOG_EP_CTRL, "ep %p: Destroying EFA direct op entry pool\n", base_ep);
ofi_bufpool_destroy(base_ep->efa_direct_ope_pool);
base_ep->efa_direct_ope_pool = NULL;
EFA_INFO(FI_LOG_EP_CTRL, "ep %p: Destroying EFA direct op entry pool\n", ep);
ofi_bufpool_destroy(ep->ope_pool);
ep->ope_pool = NULL;
}

struct efa_direct_ope *efa_direct_ope_alloc(struct efa_base_ep *base_ep,
struct efa_context *context,
const struct fi_msg *msg,
const struct fi_msg_rma *msg_rma)
{
struct efa_direct_ep *ep = container_of(base_ep, struct efa_direct_ep, base_ep);
struct efa_direct_ope *direct_ope;
const struct iovec *msg_iov = msg ? msg->msg_iov : msg_rma->msg_iov;
void **desc = msg ? msg->desc : msg_rma->desc;
Expand All @@ -86,10 +87,10 @@ struct efa_direct_ope *efa_direct_ope_alloc(struct efa_base_ep *base_ep,
uint64_t data = msg ? msg->data : msg_rma->data;
size_t i;

if (!base_ep->efa_direct_ope_pool)
if (!ep->ope_pool)
return NULL;

direct_ope = ofi_buf_alloc(base_ep->efa_direct_ope_pool);
direct_ope = ofi_buf_alloc(ep->ope_pool);
if (OFI_UNLIKELY(!direct_ope)) {
EFA_WARN(FI_LOG_EP_DATA,
"Failed to allocate EFA direct OPE\n");
Expand All @@ -110,7 +111,7 @@ struct efa_direct_ope *efa_direct_ope_alloc(struct efa_base_ep *base_ep,
}

ofi_genlock_lock(&base_ep->domain->util_domain.lock);
dlist_insert_tail(&direct_ope->entry, &base_ep->efa_direct_ope_list);
dlist_insert_tail(&direct_ope->entry, &ep->ope_list);
ofi_genlock_unlock(&base_ep->domain->util_domain.lock);

return direct_ope;
Expand All @@ -119,7 +120,13 @@ struct efa_direct_ope *efa_direct_ope_alloc(struct efa_base_ep *base_ep,
void efa_direct_ope_release(struct efa_base_ep *base_ep,
struct efa_direct_ope *direct_ope)
{
if (!direct_ope || !base_ep || !base_ep->efa_direct_ope_pool)
struct efa_direct_ep *ep;

if (!direct_ope || !base_ep)
return;

ep = container_of(base_ep, struct efa_direct_ep, base_ep);
if (!ep->ope_pool)
return;

ofi_genlock_lock(&base_ep->domain->util_domain.lock);
Expand Down
14 changes: 7 additions & 7 deletions prov/efa/src/efa_direct_ope.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#ifndef EFA_DIRECT_OPE_H
#define EFA_DIRECT_OPE_H

#include "efa_base_ep.h"
#include "efa_direct_ep.h"

/**
* @brief Maximum number of IOVs supported by EFA device
Expand Down Expand Up @@ -32,24 +32,24 @@ struct efa_direct_ope {
* and initializes the outstanding operation lists.
* The pool is only created if FI_EFA_TRACK_MR is enabled.
*
* @param[in,out] base_ep base endpoint
* @param[in,out] ep efa-direct endpoint
* @return 0 on success, negative error code on failure
*/
int efa_direct_ope_pool_create(struct efa_base_ep *base_ep);
int efa_direct_ope_pool_create(struct efa_direct_ep *ep);

/**
* @brief Destroy the EFA direct operation entry pool
*
* Releases any outstanding operations and destroys the pool.
*
* @param[in,out] base_ep base endpoint
* @param[in,out] ep efa-direct endpoint
*/
void efa_direct_ope_pool_destroy(struct efa_base_ep *base_ep);
void efa_direct_ope_pool_destroy(struct efa_direct_ep *ep);

/**
* @brief Allocate and record an operation entry
*
* Acquires efa_domain->util_domain.lock to protect the efa_direct_ope_list.
* Acquires efa_domain->util_domain.lock to protect the ope_list.
* This is the same lock used by efa_mr_close when iterating across all EPs.
*
* @param[in,out] base_ep base endpoint
Expand All @@ -66,7 +66,7 @@ struct efa_direct_ope *efa_direct_ope_alloc(struct efa_base_ep *base_ep,
/**
* @brief Release an operation entry
*
* Acquires efa_domain->util_domain.lock to protect the efa_direct_ope_list.
* Acquires efa_domain->util_domain.lock to protect the ope_list.
* This is the same lock used by efa_mr_close when iterating across all EPs.
*
* @param[in,out] base_ep base endpoint
Expand Down
31 changes: 17 additions & 14 deletions prov/efa/src/efa_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "efa.h"
#include "efa_av.h"
#include "efa_cq.h"
#include "efa_direct_ep.h"

#include <infiniband/efadv.h>

Expand Down Expand Up @@ -203,18 +204,18 @@ static struct fi_ops_ep efa_ep_base_ops = {

static int efa_ep_close(fid_t fid)
{
struct efa_base_ep *ep;
struct efa_direct_ep *ep;
int ret;

ep = container_of(fid, struct efa_base_ep, util_ep.ep_fid.fid);
ep = container_of(fid, struct efa_direct_ep, base_ep.util_ep.ep_fid.fid);

/* We need to free the util_ep first to avoid race conditions
* with other threads progressing the cntr. */
efa_base_ep_close_util_ep(ep);
efa_base_ep_close_util_ep(&ep->base_ep);

efa_base_ep_remove_cntr_ibv_cq_poll_list(ep);
efa_base_ep_remove_cntr_ibv_cq_poll_list(&ep->base_ep);

ret = efa_base_ep_destruct(ep);
ret = efa_base_ep_destruct(&ep->base_ep);
if (ret) {
EFA_WARN(FI_LOG_EP_CTRL, "Unable to close base endpoint\n");
}
Expand Down Expand Up @@ -334,18 +335,20 @@ static int efa_ep_setflags(struct fid_ep *ep_fid, uint64_t flags)

static int efa_ep_enable(struct fid_ep *ep_fid)
{
struct efa_base_ep *ep;
struct efa_direct_ep *ep;
struct efa_base_ep *base_ep;
int err;

ep = container_of(ep_fid, struct efa_base_ep, util_ep.ep_fid);
ep = container_of(ep_fid, struct efa_direct_ep, base_ep.util_ep.ep_fid);
base_ep = &ep->base_ep;

err = efa_base_ep_create_and_enable_qp(ep);
err = efa_base_ep_create_and_enable_qp(base_ep);
if (err)
return err;

err = efa_base_ep_insert_cntr_ibv_cq_poll_list(ep);
err = efa_base_ep_insert_cntr_ibv_cq_poll_list(base_ep);
if (err) {
efa_base_ep_destruct_qp(ep);
efa_base_ep_destruct_qp(base_ep);
return err;
}

Expand Down Expand Up @@ -432,18 +435,18 @@ struct fi_ops_cm efa_ep_cm_ops = {
int efa_ep_open(struct fid_domain *domain_fid, struct fi_info *user_info,
struct fid_ep **ep_fid, void *context)
{
struct efa_base_ep *ep;
struct efa_direct_ep *ep;
int ret;

ep = calloc(1, sizeof(*ep));
if (!ep)
return -FI_ENOMEM;

ret = efa_base_ep_construct(ep, domain_fid, user_info, efa_ep_progress_no_op, context);
ret = efa_base_ep_construct(&ep->base_ep, domain_fid, user_info, efa_ep_progress_no_op, context);
if (ret)
goto err_ep_destroy;

*ep_fid = &ep->util_ep.ep_fid;
*ep_fid = &ep->base_ep.util_ep.ep_fid;
(*ep_fid)->fid.fclass = FI_CLASS_EP;
(*ep_fid)->fid.context = context;
(*ep_fid)->fid.ops = &efa_ep_ops;
Expand All @@ -456,7 +459,7 @@ int efa_ep_open(struct fid_domain *domain_fid, struct fi_info *user_info,
return 0;

err_ep_destroy:
efa_base_ep_destruct(ep);
efa_base_ep_destruct(&ep->base_ep);
if (ep)
free(ep);
return ret;
Expand Down
13 changes: 12 additions & 1 deletion prov/efa/src/efa_mr.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "config.h"
#include <ofi_util.h>
#include "efa.h"
#include "efa_direct_ep.h"
#include "rdm/efa_rdm_ep.h"
#include "rdm/efa_rdm_ope.h"
#if HAVE_CUDA
Expand Down Expand Up @@ -197,9 +198,19 @@ static void efa_mr_close_check_inflight_ope(struct efa_mr *efa_mr)
ofi_genlock_lock(&efa_domain->util_domain.lock);
dlist_foreach_container(&efa_domain->base_ep_list, struct efa_base_ep,
base_ep, base_ep_entry) {
struct efa_direct_ep *ep;
struct efa_direct_ope *direct_ope;

/* Only efa-direct/DGRAM endpoints use struct efa_direct_ep.
* efa-protocol (RDM) endpoints use struct efa_rdm_ep,
* so container_of to efa_direct_ep would be invalid for them.
*/
if (EFA_INFO_TYPE_IS_RDM(base_ep->info))
continue;

ep = container_of(base_ep, struct efa_direct_ep, base_ep);
dlist_foreach_container_safe (
&base_ep->efa_direct_ope_list,
&ep->ope_list,
struct efa_direct_ope, direct_ope,
entry, tmp) {
efa_mr_close_warn_inflight_ope(direct_ope->desc, direct_ope->iov_count, &direct_ope->cq_entry, efa_mr, base_ep);
Expand Down
13 changes: 0 additions & 13 deletions prov/efa/src/efa_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -424,16 +424,3 @@ struct fi_ops_msg efa_msg_ops = {
.inject = efa_ep_msg_inject,
.injectdata = efa_ep_msg_injectdata,
};

struct fi_ops_msg efa_dgram_ep_msg_ops = {
.size = sizeof(struct fi_ops_msg),
.recv = efa_ep_recv,
.recvv = efa_ep_recvv,
.recvmsg = efa_ep_recvmsg,
.send = efa_ep_send,
.sendv = efa_ep_sendv,
.sendmsg = efa_ep_sendmsg,
.senddata = efa_ep_senddata,
.inject = fi_no_msg_inject,
.injectdata = fi_no_msg_injectdata,
};
13 changes: 1 addition & 12 deletions prov/efa/src/efa_rma.c
Original file line number Diff line number Diff line change
Expand Up @@ -442,18 +442,7 @@ static ssize_t efa_rma_inject_writedata(struct fid_ep *ep, const void *buf, size
return err;
}

struct fi_ops_rma efa_dgram_ep_rma_ops = {
.size = sizeof(struct fi_ops_rma),
.read = fi_no_rma_read,
.readv = fi_no_rma_readv,
.readmsg = fi_no_rma_readmsg,
.write = fi_no_rma_write,
.writev = fi_no_rma_writev,
.writemsg = fi_no_rma_writemsg,
.inject = fi_no_rma_inject,
.writedata = fi_no_rma_writedata,
.injectdata = fi_no_rma_injectdata,
};


struct fi_ops_rma efa_rma_ops = {
.size = sizeof(struct fi_ops_rma),
Expand Down
Loading