diff --git a/fabtests/test_configs/lnx/lnx.exclude b/fabtests/test_configs/lnx/lnx.exclude index 088140cea14..e49287a21b0 100644 --- a/fabtests/test_configs/lnx/lnx.exclude +++ b/fabtests/test_configs/lnx/lnx.exclude @@ -32,6 +32,7 @@ sighandler_test # Uses FI_RMA multi_mr rma +mr_test # Tests that are broken recv_cancel diff --git a/prov/lnx/Makefile.include b/prov/lnx/Makefile.include index 78e034d79b3..6b6f1ac9816 100644 --- a/prov/lnx/Makefile.include +++ b/prov/lnx/Makefile.include @@ -37,9 +37,10 @@ _lnx_files = \ prov/lnx/src/lnx_domain.c \ prov/lnx/src/lnx_ep.c \ prov/lnx/src/lnx_init.c \ - prov/lnx/src/lnx_ops.c \ + prov/lnx/src/lnx_srx.c \ prov/lnx/src/lnx_mr.c \ - prov/lnx/src/lnx_av.c + prov/lnx/src/lnx_av.c \ + prov/lnx/src/lnx_msg.c _lnx_headers = \ prov/lnx/include/lnx.h diff --git a/prov/lnx/include/lnx.h b/prov/lnx/include/lnx.h index 203eac516b9..aafd3a8f884 100644 --- a/prov/lnx/include/lnx.h +++ b/prov/lnx/include/lnx.h @@ -1,5 +1,6 @@ /* * Copyright (c) 2022 ORNL. All rights reserved. + * Copyright (c) Intel Corporation. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU @@ -33,17 +34,31 @@ #ifndef LNX_H #define LNX_H -#define LNX_NUM_HISTORY 4096 +#include "ofi_lock.h" +#include "ofi_util.h" + #define LNX_MAX_LOCAL_EPS 16 #define LNX_IOV_LIMIT 4 -#define LNX_MAX_PRIMARY_ID ((1ULL << 56) - 1) -#define LNX_MAX_SUB_ID ((1ULL << 8) - 1) - -#define LNX_PER_MSG_SELECTION_STR "PER_MSG" -#define LNX_PER_PEER_SELECTION_STR "PER_PEER" +#define LNX_SUB_ID_BITS 8 +#define LNX_MAX_SUB_ID ((1ULL << LNX_SUB_ID_BITS) - 1) #define lnx_ep_rx_flags(lnx_ep) ((lnx_ep)->le_ep.rx_op_flags) +enum lnx_multirail_selection { + LNX_MR_SELECTION_PER_MSG = 0, + LNX_MR_SELECTION_PER_PEER, + LNX_MR_SELECTION_MAX, +}; + +struct lnx_env { + enum lnx_multirail_selection mr; + char *prov_links; + int disable_shm; + int dump_stats; +}; + +extern struct lnx_env lnx_env; + struct lnx_match_attr { fi_addr_t lm_addr; uint64_t lm_tag; @@ -167,12 +182,6 @@ struct lnx_domain { int ld_ep_idx; }; -enum lnx_multirail_selection { - LNX_MR_SELECTION_PER_MSG = 0, - LNX_MR_SELECTION_PER_PEER, - LNX_MR_SELECTION_MAX, -}; - struct lnx_ep { int le_idx; struct util_ep le_ep; @@ -224,34 +233,26 @@ struct lnx_rx_entry { bool rx_global; }; -OFI_DECLARE_FREESTACK(struct lnx_rx_entry, lnx_recv_fs); - +extern struct fi_ops_msg lnx_msg_ops; +extern struct fi_ops_tagged lnx_tagged_ops; +extern struct fi_ops_rma lnx_rma_ops; +extern struct fi_ops_atomic lnx_atomic_ops; +extern struct fi_ops_srx_owner lnx_srx_ops; extern struct util_prov lnx_util_prov; extern struct fi_provider lnx_prov; extern struct ofi_bufpool *global_recv_bp; extern ofi_spin_t global_bplock; -int lnx_getinfo(uint32_t version, const char *node, const char *service, - uint64_t flags, const struct fi_info *hints, - struct fi_info **info); - -int lnx_fabric(struct fi_fabric_attr *attr, struct fid_fabric **fabric, - void *context); int lnx_setup_fabrics(char *name, struct lnx_fabric *lnx_fab, void *context); -void lnx_fini(void); - -int lnx_fabric_close(struct fid *fid); - int lnx_domain_open(struct fid_fabric *fabric, struct fi_info *info, struct fid_domain **dom, void *context); int lnx_av_open(struct fid_domain *domain, struct fi_av_attr *attr, struct fid_av **av, void *context); -struct lnx_peer * -lnx_av_lookup_addr(struct lnx_av *av, fi_addr_t addr); +struct lnx_peer *lnx_av_lookup_addr(struct lnx_av *av, fi_addr_t addr); int lnx_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, struct fid_cq **cq, void *context); @@ -259,32 +260,25 @@ int lnx_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, int lnx_endpoint(struct fid_domain *domain, struct fi_info *info, struct fid_ep **ep, void *context); -int lnx_cq2ep_bind(struct fid *fid, struct fid *bfid, uint64_t flags); - -int lnx_get_msg(struct fid_peer_srx *srx, struct fi_peer_match_attr *match, - struct fi_peer_rx_entry **entry); -int lnx_get_tag(struct fid_peer_srx *srx, struct fi_peer_match_attr *match, - struct fi_peer_rx_entry **entry); -int lnx_queue_msg(struct fi_peer_rx_entry *entry); -int lnx_queue_tag(struct fi_peer_rx_entry *entry); -void lnx_free_entry(struct fi_peer_rx_entry *entry); -void lnx_foreach_unspec_addr(struct fid_peer_srx *srx, - fi_addr_t (*get_addr)(struct fi_peer_rx_entry *)); - int lnx_mr_regattr(struct fid *fid, const struct fi_mr_attr *attr, uint64_t flags, struct fid_mr **mr_fid); int lnx_mr_regattr_core(struct lnx_core_domain *cd, void *desc, void **core_desc); -static inline fi_addr_t -lnx_encode_fi_addr(uint64_t primary_id, uint8_t sub_id) +int lnx_process_recv(struct lnx_ep *lep, const struct iovec *iov, void *desc, + fi_addr_t addr, size_t count, uint64_t tag, + uint64_t ignore, void *context, uint64_t flags, + bool tagged); + +static inline fi_addr_t lnx_encode_fi_addr(uint64_t primary_id, uint8_t sub_id) { return (primary_id << 8) | sub_id; } static inline fi_addr_t lnx_decode_primary_id(uint64_t fi_addr) { - return (fi_addr == FI_ADDR_UNSPEC) ? fi_addr : fi_addr >> 8; + return (fi_addr == FI_ADDR_UNSPEC) ? fi_addr : + fi_addr >> LNX_SUB_ID_BITS; } static inline uint8_t lnx_decode_sub_id(uint64_t fi_addr) @@ -292,8 +286,8 @@ static inline uint8_t lnx_decode_sub_id(uint64_t fi_addr) return (fi_addr == FI_ADDR_UNSPEC) ? fi_addr : fi_addr & LNX_MAX_SUB_ID; } -static inline fi_addr_t -lnx_get_core_addr(struct lnx_core_ep *cep, fi_addr_t addr) +static inline fi_addr_t lnx_get_core_addr(struct lnx_core_ep *cep, + fi_addr_t addr) { struct lnx_peer_map *map_addr; uint8_t idx = lnx_decode_sub_id(addr); @@ -306,131 +300,13 @@ lnx_get_core_addr(struct lnx_core_ep *cep, fi_addr_t addr) return map_addr->map_addrs[idx]; } -/* - * Round robin over the end points and peer addresses per message - * This does not take into consideration send after send requirements. - */ -static inline int -lnx_select_send_endpoints_msg(struct lnx_ep *lep, fi_addr_t lnx_addr, - struct lnx_core_ep **cep_out, fi_addr_t *core_addr) -{ - int idx, rr; - struct lnx_peer *lp; - struct lnx_core_ep *cep; - struct lnx_peer_map *map_addr; - struct lnx_peer_ep_map *ep_map; - - lp = lnx_av_lookup_addr(lep->le_lav, lnx_addr); - if (!lp) - return -FI_ENOSYS; - - /* round robin over the endpoints which can reach this peer */ - rr = ofi_atomic_inc32(&lp->lp_ep_rr) - 1; - ep_map = &lp->lp_src_eps[lep->le_idx]; - idx = rr % ep_map->pem_num_eps; - cep = ep_map->pem_eps[idx]; - - map_addr = ofi_bufpool_get_ibuf(cep->cep_cav->cav_map, lp->lp_addr); - - /* round robin over available peer addresses */ - rr = ofi_atomic_inc32(&map_addr->map_rr) - 1; - idx = rr % (map_addr->map_count); - - *core_addr = map_addr->map_addrs[idx]; - - *cep_out = cep; - - return FI_SUCCESS; -} - -/* - * Round robin over the end points, assigning each peer a particular local - * endpoint and one of its core addresses to use. This ensures that the - * peer receives messages in the same order they are sent. - */ -static inline int -lnx_select_send_endpoints_peer(struct lnx_ep *lep, fi_addr_t lnx_addr, - struct lnx_core_ep **cep_out, fi_addr_t *core_addr) -{ - bool found = false; - int idx, i; - uint32_t rr, seed; - struct lnx_peer *lp; - struct lnx_core_ep *cep; - struct lnx_peer_map *map_addr; - struct lnx_peer_ep_map *ep_map; - - lp = lnx_av_lookup_addr(lep->le_lav, lnx_addr); - if (!lp) - return -FI_ENOSYS; - - if (lp->lp_locked_cep) { - *core_addr = lp->lp_locked_core_addr; - *cep_out = lp->lp_locked_cep; - return FI_SUCCESS; - } - - ep_map = &lp->lp_src_eps[lep->le_idx]; - - while (!found) { - rr = ofi_atomic_inc32(&lep->le_rr) - 1; - idx = rr % lep->le_domain->ld_num_doms; - cep = &lep->le_core_eps[idx]; - - for (i = 0; i < ep_map->pem_num_eps; i++) { - if (cep == ep_map->pem_eps[i]) { - found = true; - break; - } - } - } - - map_addr = ofi_bufpool_get_ibuf(cep->cep_cav->cav_map, lp->lp_addr); - - /* randomize the address selection from the list of reachable - * peer addresses. This will allow different processes to send to - * different peer addresses, instead of using the first address of - * a peer. This is not round robin over the peer addresses, but - * in the PER_PEER case, there is no effective way to enforce - * round robin across independent peers. In this method it at - * least allows spreading traffic across peer addresses. */ - seed = (uint32_t)ofi_gettime_ns(); - rr = ofi_xorshift_random(seed); - idx = rr % (map_addr->map_count); - - *core_addr = lp->lp_locked_core_addr = map_addr->map_addrs[idx]; - *cep_out = lp->lp_locked_cep = cep; - - return FI_SUCCESS; -} - -static inline void -lnx_set_send_pair_noop(struct lnx_ep *lep, struct lnx_core_ep *cep, fi_addr_t addr) -{ -} - -/* if you haven't sent to a particular peer before then lock the - * address you're going to send on. If this is the first message - * you receive from peer, then you want to respond on the same - * address and core endpoint that peer used. This will ensure - * symmetry. -*/ -static inline void -lnx_set_send_pair_peer(struct lnx_ep *lep, struct lnx_core_ep *cep, fi_addr_t addr) -{ - fi_addr_t core_addr, prim_addr; - struct lnx_peer *lp; +typedef int (*lnx_select_tx_ep)( + struct lnx_ep *lep, fi_addr_t lnx_addr, + struct lnx_core_ep **cep_out, fi_addr_t *core_addr); +extern lnx_select_tx_ep lnx_select_send_endpoints[LNX_MR_SELECTION_MAX]; - if (addr == FI_ADDR_UNSPEC) - return; - - prim_addr = lnx_decode_primary_id(addr); - lp = lnx_av_lookup_addr(lep->le_lav, prim_addr); - if (!lp->lp_locked_cep) { - core_addr = lnx_get_core_addr(cep, addr); - lp->lp_locked_core_addr = core_addr; - lp->lp_locked_cep = cep; - } -} +typedef void (*lnx_set_tx_pair)( + struct lnx_ep *lep, struct lnx_core_ep *cep, fi_addr_t addr); +extern lnx_set_tx_pair lnx_set_send_pair[LNX_MR_SELECTION_MAX]; #endif /* LNX_H */ diff --git a/prov/lnx/src/lnx_av.c b/prov/lnx/src/lnx_av.c index e1b9dbe5712..a0b5704e9fe 100644 --- a/prov/lnx/src/lnx_av.c +++ b/prov/lnx/src/lnx_av.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2022 ORNL. All rights reserved. + * Copyright (c) Intel Corporation. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU @@ -30,36 +31,9 @@ * SOFTWARE. */ -#include "config.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include "ofi_util.h" -#include "ofi.h" -#include "ofi_str.h" -#include "ofi_prov.h" -#include "ofi_perf.h" -#include "ofi_hmem.h" -#include "rdma/fi_ext.h" #include "lnx.h" -struct lnx_peer * -lnx_av_lookup_addr(struct lnx_av *av, fi_addr_t addr) +struct lnx_peer *lnx_av_lookup_addr(struct lnx_av *av, fi_addr_t addr) { struct lnx_peer *lp, **lpp; @@ -80,7 +54,7 @@ lnx_av_lookup_addr(struct lnx_av *av, fi_addr_t addr) return lp; } -int lnx_av_close(struct fid *fid) +static int lnx_av_close(struct fid *fid) { int i, rc, frc = 0; struct lnx_core_av *core_av; @@ -110,35 +84,6 @@ static struct fi_ops lnx_av_fi_ops = { .ops_open = fi_no_ops_open, }; -static void -lnx_update_msg_entries(struct lnx_qpair *qp, - fi_addr_t (*get_addr)(struct fi_peer_rx_entry *)) -{ - struct lnx_queue *q = &qp->lqp_unexq; - struct lnx_rx_entry *rx_entry; - struct dlist_entry *item; - - dlist_foreach(&q->lq_queue, item) { - rx_entry = (struct lnx_rx_entry *) item; - if (rx_entry->rx_entry.addr == FI_ADDR_UNSPEC) - rx_entry->rx_entry.addr = get_addr(&rx_entry->rx_entry); - } -} - -void -lnx_foreach_unspec_addr(struct fid_peer_srx *srx, - fi_addr_t (*get_addr)(struct fi_peer_rx_entry *)) -{ - struct lnx_ep *lep; - struct lnx_core_ep *cep; - - cep = (struct lnx_core_ep *) srx->ep_fid.fid.context; - lep = cep->cep_parent; - - lnx_update_msg_entries(&lep->le_srq.lps_trecv, get_addr); - lnx_update_msg_entries(&lep->le_srq.lps_recv, get_addr); -} - static int lnx_peer_av_remove(struct lnx_peer *lp) { int i, j, rc, frc = 0; @@ -151,7 +96,8 @@ static int lnx_peer_av_remove(struct lnx_peer *lp) break; map_addr = ofi_bufpool_get_ibuf(cav->cav_map, lp->lp_addr); for (j = 0; j < map_addr->map_count; j++) { - rc = fi_av_remove(cav->cav_av, &map_addr->map_addrs[j], 1, 0); + rc = fi_av_remove(cav->cav_av, &map_addr->map_addrs[j], + 1, 0); if (rc) frc = rc; } @@ -160,8 +106,7 @@ static int lnx_peer_av_remove(struct lnx_peer *lp) return frc; } -static void -lnx_free_src_eps(struct lnx_peer_ep_map *src_eps, int count) +static void lnx_free_src_eps(struct lnx_peer_ep_map *src_eps, int count) { int i; @@ -197,8 +142,8 @@ static int lnx_peer_remove(struct lnx_av *lav, fi_addr_t addr) return rc; } -int lnx_av_remove(struct fid_av *av, fi_addr_t *fi_addr, size_t count, - uint64_t flags) +static int lnx_av_remove(struct fid_av *av, fi_addr_t *fi_addr, size_t count, + uint64_t flags) { struct lnx_av *lav; int frc = 0, rc, i; @@ -214,9 +159,8 @@ int lnx_av_remove(struct fid_av *av, fi_addr_t *fi_addr, size_t count, return frc; } -static int -lnx_setup_ep_mapping(struct lnx_peer *lp, - struct lnx_core_ep *ceps[][LNX_MAX_LOCAL_EPS]) +static int lnx_setup_ep_mapping(struct lnx_peer *lp, + struct lnx_core_ep *ceps[][LNX_MAX_LOCAL_EPS]) { int i = 0, j = 0; size_t size; @@ -240,7 +184,8 @@ lnx_setup_ep_mapping(struct lnx_peer *lp, return -FI_ENOMEM; for (j = 0; j < i; j++) { - lp->lp_src_eps[j].pem_eps = calloc(sizeof(struct lnx_core_ep *)*counts[j], 1); + lp->lp_src_eps[j].pem_eps = calloc( + sizeof(struct lnx_core_ep *) * counts[j], 1); if (!lp->lp_src_eps[j].pem_eps) { lnx_free_src_eps(lp->lp_src_eps, lp->lp_ep_count); return -FI_ENOMEM; @@ -261,10 +206,10 @@ lnx_setup_ep_mapping(struct lnx_peer *lp, return FI_SUCCESS; } -static int -lnx_insert_addr(struct lnx_core_av *core_av, struct lnx_ep_addr *addr, - struct lnx_peer *lp, - struct lnx_core_ep *cep_tmp[][LNX_MAX_LOCAL_EPS], bool local) +static int lnx_insert_addr(struct lnx_core_av *core_av, + struct lnx_ep_addr *addr, struct lnx_peer *lp, + struct lnx_core_ep *cep_tmp[][LNX_MAX_LOCAL_EPS], + bool local) { int i, rc; bool present; @@ -284,8 +229,8 @@ lnx_insert_addr(struct lnx_core_av *core_av, struct lnx_ep_addr *addr, return FI_SUCCESS; /* cache the endpoint information */ - dlist_foreach_container(&core_av->cav_endpoints, struct lnx_core_ep, cep, - cep_av_entry) { + dlist_foreach_container(&core_av->cav_endpoints, struct lnx_core_ep, + cep, cep_av_entry) { lep = cep->cep_parent; present = false; i = 0; @@ -310,7 +255,8 @@ lnx_insert_addr(struct lnx_core_av *core_av, struct lnx_ep_addr *addr, if (!cav) break; if (cav == core_av) { - map_addr = ofi_bufpool_get_ibuf(core_av->cav_map, lp->lp_addr); + map_addr = ofi_bufpool_get_ibuf(core_av->cav_map, + lp->lp_addr); goto insert; } } @@ -323,7 +269,8 @@ lnx_insert_addr(struct lnx_core_av *core_av, struct lnx_ep_addr *addr, insert: core_fi_addr = lnx_encode_fi_addr(lp->lp_addr, map_addr->map_count); - rc = fi_av_insert(core_av->cav_av, core_addr, 1, &core_fi_addr, FI_AV_USER_ID, NULL); + rc = fi_av_insert(core_av->cav_av, core_addr, 1, &core_fi_addr, + FI_AV_USER_ID, NULL); if (rc <= 0) return rc; @@ -337,7 +284,6 @@ int lnx_av_insert(struct fid_av *av, const void *addr, size_t count, { int i, j, k, rc; bool local, once; - int disable_shm = 0; struct lnx_peer *lp; char hostname[FI_NAME_MAX]; struct lnx_av *lav; @@ -348,8 +294,6 @@ int lnx_av_insert(struct fid_av *av, const void *addr, size_t count, if (flags & FI_AV_USER_ID) return -FI_ENOSYS; - fi_param_get_bool(&lnx_prov, "disable_shm", &disable_shm); - lav = container_of(av, struct lnx_av, lav_av.av_fid.fid); rc = gethostname(hostname, FI_NAME_MAX); @@ -381,7 +325,7 @@ int lnx_av_insert(struct fid_av *av, const void *addr, size_t count, * inter-node providers */ if (!strcmp(hostname, la->la_hostname) && - !strcmp(lea->lea_prov, "shm") && !disable_shm) + !strcmp(lea->lea_prov, "shm") && !lnx_env.disable_shm) local = true; ofi_genlock_lock(&lav->lav_av.lock); @@ -402,10 +346,12 @@ int lnx_av_insert(struct fid_av *av, const void *addr, size_t count, for (k = 0; k < lav->lav_domain->ld_num_doms; k++) { core_av = &lav->lav_core_avs[k]; - rc = lnx_insert_addr(core_av, lea, lp, cep_tmp, local); + rc = lnx_insert_addr(core_av, lea, lp, cep_tmp, + local); if (rc) { - (void) lnx_av_remove(&lav->lav_av.av_fid, - &lp->lp_addr, 1, 0); + (void) lnx_av_remove( + &lav->lav_av.av_fid, + &lp->lp_addr, 1, 0); return rc; } if (local) { @@ -415,12 +361,13 @@ int lnx_av_insert(struct fid_av *av, const void *addr, size_t count, } skip: lea = (struct lnx_ep_addr *) - ((char*)lea + sizeof(*lea) + lea->lea_addr_size); + ((char*)lea + sizeof(*lea) + + lea->lea_addr_size); } rc = lnx_setup_ep_mapping(lp, cep_tmp); if (rc) { (void) lnx_av_remove(&lav->lav_av.av_fid, - &lp->lp_addr, 1, 0); + &lp->lp_addr, 1, 0); return rc; } @@ -430,17 +377,15 @@ int lnx_av_insert(struct fid_av *av, const void *addr, size_t count, return i; } -static const char * -lnx_av_straddr(struct fid_av *av, const void *addr, - char *buf, size_t *len) +static const char * lnx_av_straddr(struct fid_av *av, const void *addr, + char *buf, size_t *len) { /* TODO: implement */ return NULL; } -static int -lnx_av_lookup(struct fid_av *av, fi_addr_t fi_addr, void *addr, - size_t *addrlen) +static int lnx_av_lookup(struct fid_av *av, fi_addr_t fi_addr, void *addr, + size_t *addrlen) { /* TODO: implement */ return -FI_EOPNOTSUPP; @@ -547,5 +492,3 @@ int lnx_av_open(struct fid_domain *domain, struct fi_av_attr *attr, out: return rc; } - - diff --git a/prov/lnx/src/lnx_cq.c b/prov/lnx/src/lnx_cq.c index ee205293aa5..e021fadeff1 100644 --- a/prov/lnx/src/lnx_cq.c +++ b/prov/lnx/src/lnx_cq.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2022 ORNL. All rights reserved. + * Copyright (c) Intel Corporation. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU @@ -30,23 +31,6 @@ * SOFTWARE. */ -#include "config.h" - -#include -#include -#include -#include -#include -#include - -#include -#include "ofi_util.h" -#include "ofi.h" -#include "ofi_str.h" -#include "ofi_prov.h" -#include "ofi_perf.h" -#include "ofi_hmem.h" -#include "rdma/fi_ext.h" #include "lnx.h" static int lnx_cq_close(struct fid *fid) @@ -95,10 +79,9 @@ static void lnx_cq_progress(struct util_cq *cq) gen_lock = &lnx_cq->lcq_lnx_domain->ld_domain.lock; ofi_genlock_lock(gen_lock); - /* Kick the core provider endpoints to progress */ for (i = 0; i < lnx_cq->lcq_lnx_domain->ld_num_doms; i++) { core_cq = &lnx_cq->lcq_core_cqs[i]; - fi_cq_read(core_cq->cc_cq, NULL, 0); + (void) fi_cq_read(core_cq->cc_cq, NULL, 0); } ofi_genlock_unlock(gen_lock); } @@ -111,10 +94,8 @@ static int lnx_open_core_cqs(struct lnx_cq *lnx_cq, struct fi_cq_attr *attr) struct lnx_core_cq *core_cq; struct fi_peer_cq_context cq_ctxt; - /* tell the core providers to import my CQ */ peer_attr.flags |= FI_PEER; - /* create all the core provider completion queues */ for (i = 0; i < lnx_cq->lcq_lnx_domain->ld_num_doms; i++) { cd = &lnx_cq->lcq_lnx_domain->ld_core_domains[i]; core_cq = &lnx_cq->lcq_core_cqs[i]; @@ -122,8 +103,8 @@ static int lnx_open_core_cqs(struct lnx_cq *lnx_cq, struct fi_cq_attr *attr) cq_ctxt.size = sizeof(cq_ctxt); cq_ctxt.cq = lnx_cq->lcq_util_cq.peer_cq; - /* pass my CQ into the open and get back the core's cq */ - rc = fi_cq_open(cd->cd_domain, &peer_attr, &core_cq->cc_cq, &cq_ctxt); + rc = fi_cq_open(cd->cd_domain, &peer_attr, &core_cq->cc_cq, + &cq_ctxt); if (rc) return rc; @@ -154,9 +135,6 @@ int lnx_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, return -FI_ENOMEM; } - /* this is going to be a standard CQ from the read side. From the - * write side, it'll use the peer_cq callbacks to write - */ rc = ofi_cq_init(&lnx_prov, domain, attr, &lnx_cq->lcq_util_cq, &lnx_cq_progress, context); if (rc) @@ -166,7 +144,6 @@ int lnx_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, lnx_cq->lcq_util_cq.cq_fid.fid.ops = &lnx_cq_fi_ops; (*cq_fid) = &lnx_cq->lcq_util_cq.cq_fid; - /* open core CQs and tell them to import my CQ */ rc = lnx_open_core_cqs(lnx_cq, attr); return rc; diff --git a/prov/lnx/src/lnx_domain.c b/prov/lnx/src/lnx_domain.c index 9c2e7c7f139..39dade126e1 100644 --- a/prov/lnx/src/lnx_domain.c +++ b/prov/lnx/src/lnx_domain.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2022 ORNL. All rights reserved. + * Copyright (c) Intel Corporation. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU @@ -30,23 +31,6 @@ * SOFTWARE. */ -#include "config.h" - -#include -#include -#include -#include -#include -#include - -#include -#include "ofi_util.h" -#include "ofi.h" -#include "ofi_str.h" -#include "ofi_prov.h" -#include "ofi_perf.h" -#include "ofi_hmem.h" -#include "rdma/fi_ext.h" #include "lnx.h" static struct fi_ops_domain lnx_domain_ops = { @@ -71,7 +55,6 @@ static int lnx_domain_close(struct fid *fid) domain = container_of(fid, struct lnx_domain, ld_domain.domain_fid.fid); - /* close all the open core domains */ for (i = 0; i < domain->ld_num_doms; i++) { cd = &domain->ld_core_domains[i]; @@ -108,8 +91,8 @@ static struct fi_ops_mr lnx_mr_ops = { .regattr = lnx_mr_regattr, }; -static int lnx_open_core_domains(struct lnx_fabric *lnx_fab, - void *context, struct lnx_domain *lnx_domain) +static int lnx_open_core_domains(struct lnx_fabric *lnx_fab, void *context, + struct lnx_domain *lnx_dom) { int rc, i; char *prov_name; @@ -120,22 +103,23 @@ static int lnx_open_core_domains(struct lnx_fabric *lnx_fab, for (i = 0; i < lnx_fab->lf_num_fabs; i++) { cf = &lnx_fab->lf_core_fabrics[i]; for (itr = cf->cf_info; itr; itr = itr->next) - lnx_domain->ld_num_doms++; + lnx_dom->ld_num_doms++; } - if (lnx_domain->ld_num_doms >= LNX_MAX_LOCAL_EPS) { + if (lnx_dom->ld_num_doms >= LNX_MAX_LOCAL_EPS) { FI_WARN(&lnx_prov, FI_LOG_FABRIC, "Too many domains to link. Maximum allowed: %d\n", LNX_MAX_LOCAL_EPS); return -FI_E2BIG; } - lnx_domain->ld_core_domains = calloc(sizeof(*lnx_domain->ld_core_domains), - lnx_domain->ld_num_doms); - if (!lnx_domain->ld_core_domains) + lnx_dom->ld_core_domains = calloc( + lnx_dom->ld_num_doms, + sizeof(*lnx_dom->ld_core_domains)); + if (!lnx_dom->ld_core_domains) return -FI_ENOMEM; - lnx_domain->ld_num_doms = 0; + lnx_dom->ld_num_doms = 0; for (i = 0; i < lnx_fab->lf_num_fabs; i++) { cf = &lnx_fab->lf_core_fabrics[i]; @@ -149,13 +133,13 @@ static int lnx_open_core_domains(struct lnx_fabric *lnx_fab, setenv("FI_CXI_RX_MATCH_MODE", "software", 1); for (itr = cf->cf_info; itr; itr = itr->next) { - /* The shm domain should now already be at the head of the list. + /* The shm domain should be at the head of the list. * This will cause all the other shm constructs to * be the head of their respective lists, ex: av. * The purpose is to optimize the shm path for * local peers. */ - cd = &lnx_domain->ld_core_domains[lnx_domain->ld_num_doms]; + cd = &lnx_dom->ld_core_domains[lnx_dom->ld_num_doms]; cd->cd_info = itr; @@ -164,7 +148,7 @@ static int lnx_open_core_domains(struct lnx_fabric *lnx_fab, if (rc) return rc; - lnx_domain->ld_num_doms++; + lnx_dom->ld_num_doms++; cd->cd_fabric = cf; } } @@ -173,14 +157,16 @@ static int lnx_open_core_domains(struct lnx_fabric *lnx_fab, } int lnx_domain_open(struct fid_fabric *fabric, struct fi_info *info, - struct fid_domain **domain, void *context) + struct fid_domain **domain, void *context) { int rc = 0; struct lnx_domain *lnx_domain; struct util_domain *dom; struct ofi_bufpool_attr bp_attrs = {0}; - struct lnx_fabric *lnx_fab = container_of(fabric, struct lnx_fabric, - lf_util_fabric.fabric_fid); + struct lnx_fabric *lnx_fab; + + lnx_fab= container_of(fabric, struct lnx_fabric, + lf_util_fabric.fabric_fid); rc = -FI_ENOMEM; lnx_domain = calloc(sizeof(*lnx_domain), 1); @@ -209,7 +195,8 @@ int lnx_domain_open(struct fid_fabric *fabric, struct fi_info *info, rc = lnx_open_core_domains(lnx_fab, context, lnx_domain); if (rc) { - FI_INFO(&lnx_prov, FI_LOG_CORE, "Failed to initialize domain for %s\n", + FI_INFO(&lnx_prov, FI_LOG_CORE, + "Failed to initialize domain for %s\n", info->domain_attr->name); goto close_domain; } @@ -231,4 +218,3 @@ int lnx_domain_open(struct fid_fabric *fabric, struct fi_info *info, out: return rc; } - diff --git a/prov/lnx/src/lnx_ep.c b/prov/lnx/src/lnx_ep.c index c466a389a30..528cca92bed 100644 --- a/prov/lnx/src/lnx_ep.c +++ b/prov/lnx/src/lnx_ep.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2022 ORNL. All rights reserved. + * Copyright (c) Intel Corporation. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU @@ -30,39 +31,148 @@ * SOFTWARE. */ -#include "config.h" - -#include -#include -#include -#include -#include -#include - -#include -#include "ofi_util.h" -#include "ofi.h" -#include "ofi_str.h" -#include "ofi_prov.h" -#include "ofi_perf.h" -#include "ofi_hmem.h" -#include "rdma/fi_ext.h" #include "lnx.h" -extern struct fi_ops_cm lnx_cm_ops; -extern struct fi_ops_msg lnx_msg_ops; -extern struct fi_ops_tagged lnx_tagged_ops; -extern struct fi_ops_rma lnx_rma_ops; -extern struct fi_ops_atomic lnx_atomic_ops; - -static struct fi_ops_srx_owner lnx_srx_ops = { - .size = sizeof(struct fi_ops_srx_owner), - .get_msg = lnx_get_msg, - .get_tag = lnx_get_tag, - .queue_msg = lnx_queue_msg, - .queue_tag = lnx_queue_tag, - .free_entry = lnx_free_entry, - .foreach_unspec_addr = lnx_foreach_unspec_addr, +/* + * Round robin over the end points and peer addresses per message + * This does not take into consideration send after send requirements. + */ +static inline int lnx_select_send_endpoints_msg( + struct lnx_ep *lep, fi_addr_t lnx_addr, + struct lnx_core_ep **cep_out, fi_addr_t *core_addr) +{ + int idx, rr; + struct lnx_peer *lp; + struct lnx_core_ep *cep; + struct lnx_peer_map *map_addr; + struct lnx_peer_ep_map *ep_map; + + lp = lnx_av_lookup_addr(lep->le_lav, lnx_addr); + if (!lp) + return -FI_ENOSYS; + + /* round robin over the endpoints which can reach this peer */ + rr = ofi_atomic_inc32(&lp->lp_ep_rr) - 1; + ep_map = &lp->lp_src_eps[lep->le_idx]; + idx = rr % ep_map->pem_num_eps; + cep = ep_map->pem_eps[idx]; + + map_addr = ofi_bufpool_get_ibuf(cep->cep_cav->cav_map, lp->lp_addr); + + /* round robin over available peer addresses */ + rr = ofi_atomic_inc32(&map_addr->map_rr) - 1; + idx = rr % (map_addr->map_count); + + *core_addr = map_addr->map_addrs[idx]; + + *cep_out = cep; + + return FI_SUCCESS; +} + +/* + * Round robin over the end points, assigning each peer a particular local + * endpoint and one of its core addresses to use. This ensures that the + * peer receives messages in the same order they are sent. + */ +static inline int lnx_select_send_endpoints_peer( + struct lnx_ep *lep, fi_addr_t lnx_addr, + struct lnx_core_ep **cep_out, fi_addr_t *core_addr) +{ + bool found = false; + int idx, i; + uint32_t rr, seed; + struct lnx_peer *lp; + struct lnx_core_ep *cep; + struct lnx_peer_map *map_addr; + struct lnx_peer_ep_map *ep_map; + + lp = lnx_av_lookup_addr(lep->le_lav, lnx_addr); + if (!lp) + return -FI_ENOSYS; + + if (lp->lp_locked_cep) { + *core_addr = lp->lp_locked_core_addr; + *cep_out = lp->lp_locked_cep; + return FI_SUCCESS; + } + + ep_map = &lp->lp_src_eps[lep->le_idx]; + + while (!found) { + rr = ofi_atomic_inc32(&lep->le_rr) - 1; + idx = rr % lep->le_domain->ld_num_doms; + cep = &lep->le_core_eps[idx]; + + for (i = 0; i < ep_map->pem_num_eps; i++) { + if (cep == ep_map->pem_eps[i]) { + found = true; + break; + } + } + } + + map_addr = ofi_bufpool_get_ibuf(cep->cep_cav->cav_map, lp->lp_addr); + + /* randomize the address selection from the list of reachable + * peer addresses. This will allow different processes to send to + * different peer addresses, instead of using the first address of + * a peer. This is not round robin over the peer addresses, but + * in the PER_PEER case, there is no effective way to enforce + * round robin across independent peers. In this method it at + * least allows spreading traffic across peer addresses. */ + seed = (uint32_t)ofi_gettime_ns(); + rr = ofi_xorshift_random(seed); + idx = rr % (map_addr->map_count); + + *core_addr = lp->lp_locked_core_addr = map_addr->map_addrs[idx]; + *cep_out = lp->lp_locked_cep = cep; + + return FI_SUCCESS; +} + +static inline void lnx_set_send_pair_noop(struct lnx_ep *lep, + struct lnx_core_ep *cep, + fi_addr_t addr) +{ + /* no-op */ +} + +/* if you haven't sent to a particular peer before then lock the + * address you're going to send on. If this is the first message + * you receive from peer, then you want to respond on the same + * address and core endpoint that peer used. This will ensure + * symmetry. +*/ +static inline void lnx_set_send_pair_peer(struct lnx_ep *lep, + struct lnx_core_ep *cep, + fi_addr_t addr) +{ + fi_addr_t core_addr, prim_addr; + struct lnx_peer *lp; + + if (addr == FI_ADDR_UNSPEC) + return; + + prim_addr = lnx_decode_primary_id(addr); + lp = lnx_av_lookup_addr(lep->le_lav, prim_addr); + if (!lp->lp_locked_cep) { + core_addr = lnx_get_core_addr(cep, addr); + lp->lp_locked_core_addr = core_addr; + lp->lp_locked_cep = cep; + } +} + +lnx_select_tx_ep lnx_select_send_endpoints[LNX_MR_SELECTION_MAX] = +{ + [LNX_MR_SELECTION_PER_MSG] = &lnx_select_send_endpoints_msg, + [LNX_MR_SELECTION_PER_PEER] = &lnx_select_send_endpoints_peer, +}; + +lnx_set_tx_pair lnx_set_send_pair[LNX_MR_SELECTION_MAX] = +{ + [LNX_MR_SELECTION_PER_MSG] = &lnx_set_send_pair_noop, + [LNX_MR_SELECTION_PER_PEER] = &lnx_set_send_pair_peer, }; static inline void lnx_dump_core_ep_stats(struct lnx_core_ep *cep) @@ -72,8 +182,8 @@ static inline void lnx_dump_core_ep_stats(struct lnx_core_ep *cep) tstats = &cep->cep_t_stats; FI_TRACE(&lnx_prov, FI_LOG_DOMAIN, "%s,%" PRIu64 ",%" PRIu64 - ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" - PRIu64 ",%" PRIu64 "\n", + ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" + PRIu64 ",%" PRIu64 "\n", cep->cep_domain->cd_info->domain_attr->name, tstats->st_num_tsend, tstats->st_num_tsendv, tstats->st_num_tsendmsg, tstats->st_num_tsenddata, @@ -81,15 +191,15 @@ static inline void lnx_dump_core_ep_stats(struct lnx_core_ep *cep) tstats->st_num_posted_recvs, tstats->st_num_unexp_msgs); } -static inline void -lnx_dump_srx_queue_stats(struct lnx_ep *lep) +static inline void lnx_dump_srx_queue_stats(struct lnx_ep *lep) { static bool header; if (!header) { FI_TRACE(&lnx_prov, FI_LOG_DOMAIN, "Domain name,tsend,tsendv,tsendmsg,tsenddata," - "tinject,tinjectdata,posted_recvs,unexp_msgs,max_queue,avg_queue\n"); + "tinject,tinjectdata,posted_recvs,unexp_msgs," + "max_queue,avg_queue\n"); header = true; } @@ -103,24 +213,21 @@ lnx_dump_srx_queue_stats(struct lnx_ep *lep) lep->le_srq.lps_trecv.lqp_unexq.lq_rolling_avg); } -int lnx_ep_close(struct fid *fid) +static int lnx_ep_close(struct fid *fid) { int i, rc, frc = FI_SUCCESS; struct lnx_ep *lep; struct lnx_core_ep *cep; - int dump_stats; lep = container_of(fid, struct lnx_ep, le_ep.ep_fid.fid); - fi_param_get_bool(&lnx_prov, "dump_stats", &dump_stats); - - if (dump_stats) + if (lnx_env.dump_stats) lnx_dump_srx_queue_stats(lep); for (i = 0; i < lep->le_domain->ld_num_doms; i++) { cep = &lep->le_core_eps[i]; - if (dump_stats) + if (lnx_env.dump_stats) lnx_dump_core_ep_stats(cep); rc = fi_close(&cep->cep_ep->fid); @@ -226,7 +333,8 @@ static int lnx_bind_core_cqs(struct lnx_ep *lep, struct lnx_cq *lcq, return 0; } -int lnx_bind_core_avs(struct lnx_ep *lep, struct lnx_av *lav, uint64_t flags) +static int lnx_bind_core_avs(struct lnx_ep *lep, struct lnx_av *lav, + uint64_t flags) { int rc, i; struct lnx_core_ep *cep; @@ -248,8 +356,7 @@ int lnx_bind_core_avs(struct lnx_ep *lep, struct lnx_av *lav, uint64_t flags) return 0; } -static int -lnx_ep_bind(struct fid *fid, struct fid *bfid, uint64_t flags) +static int lnx_ep_bind(struct fid *fid, struct fid *bfid, uint64_t flags) { int rc = 0; struct lnx_ep *lep; @@ -313,7 +420,7 @@ lnx_ep_bind(struct fid *fid, struct fid *bfid, uint64_t flags) return rc; } -int lnx_getname(fid_t fid, void *addr, size_t *addrlen) +static int lnx_getname(fid_t fid, void *addr, size_t *addrlen) { int i, rc; char hostname[FI_NAME_MAX]; @@ -344,11 +451,9 @@ int lnx_getname(fid_t fid, void *addr, size_t *addrlen) /* query the core address size and use it to calculate the * total size of the lnx address */ rc = fi_getname(&cep->cep_ep->fid, NULL, &eps_addrlen[i]); - if (rc == -FI_ETOOSMALL) { - size += (sizeof(*ep_addr)) + eps_addrlen[i]; - } else { + if (rc != -FI_ETOOSMALL) return -FI_EINVAL; - } + size += (sizeof(*ep_addr)) + eps_addrlen[i]; } if (!addr || *addrlen < size) { @@ -368,7 +473,8 @@ int lnx_getname(fid_t fid, void *addr, size_t *addrlen) prov_name = cep->cep_domain->cd_info->fabric_attr->name; memcpy(ep_addr->lea_prov, prov_name, strlen(prov_name)+1); cep_addr = (char*)ep_addr + sizeof(*ep_addr); - rc = fi_getname(&cep->cep_ep->fid, (void*)cep_addr, &eps_addrlen[i]); + rc = fi_getname(&cep->cep_ep->fid, (void*)cep_addr, + &eps_addrlen[i]); if (rc) return rc; ep_addr->lea_addr_size = eps_addrlen[i]; @@ -412,14 +518,14 @@ static ssize_t lnx_ep_cancel(fid_t fid, void *context) return rc; } -static int lnx_ep_getopt(fid_t fid, int level, int optname, - void *optval, size_t *optlen) +static int lnx_ep_getopt(fid_t fid, int level, int optname, void *optval, + size_t *optlen) { return -FI_ENOPROTOOPT; } static int lnx_ep_setopt(fid_t fid, int level, int optname, const void *optval, - size_t optlen) + size_t optlen) { struct lnx_ep *lep; struct lnx_core_ep *cep; @@ -430,8 +536,8 @@ static int lnx_ep_setopt(fid_t fid, int level, int optname, const void *optval, for (i = 0; i < lep->le_domain->ld_num_doms; i++) { cep = &lep->le_core_eps[i]; - rc = fi_setopt(&cep->cep_ep->fid, level, optname, - optval, optlen); + rc = fi_setopt(&cep->cep_ep->fid, level, optname, optval, + optlen); if (rc == -FI_ENOSYS) { rc = 0; continue; @@ -443,7 +549,7 @@ static int lnx_ep_setopt(fid_t fid, int level, int optname, const void *optval, return rc; } -struct fi_ops_ep lnx_ep_ops = { +static struct fi_ops_ep lnx_ep_ops = { .size = sizeof(struct fi_ops_ep), .cancel = lnx_ep_cancel, /* can't get opt, because there is no way to report multiple @@ -456,7 +562,7 @@ struct fi_ops_ep lnx_ep_ops = { .tx_size_left = fi_no_tx_size_left, }; -struct fi_ops lnx_ep_fi_ops = { +static struct fi_ops lnx_ep_fi_ops = { .size = sizeof(struct fi_ops), .close = lnx_ep_close, .bind = lnx_ep_bind, @@ -464,7 +570,7 @@ struct fi_ops lnx_ep_fi_ops = { .ops_open = fi_no_ops_open, }; -struct fi_ops_cm lnx_cm_ops = { +static struct fi_ops_cm lnx_cm_ops = { .size = sizeof(struct fi_ops_cm), .setname = fi_no_setname, .getname = lnx_getname, @@ -476,8 +582,7 @@ struct fi_ops_cm lnx_cm_ops = { .shutdown = fi_no_shutdown, }; -static int -lnx_open_core_eps(struct lnx_ep *lep, void *context) +static int lnx_open_core_eps(struct lnx_ep *lep, void *context) { int i, rc = 0; struct lnx_core_domain *cd; @@ -495,8 +600,8 @@ lnx_open_core_eps(struct lnx_ep *lep, void *context) cep->cep_domain = cd; cep->cep_parent = lep; - rc = fi_endpoint(cd->cd_domain, cd->cd_info, - &cep->cep_ep, context); + rc = fi_endpoint(cd->cd_domain, cd->cd_info, &cep->cep_ep, + context); if (rc) goto fail; } @@ -512,8 +617,7 @@ lnx_open_core_eps(struct lnx_ep *lep, void *context) return rc; } -static void -lnx_ep_nosys_progress(struct util_ep *util_ep) +static void lnx_ep_nosys_progress(struct util_ep *util_ep) { assert(0); } @@ -553,9 +657,9 @@ static int lnx_match_unexq(struct dlist_entry *item, const void *args) attr->lm_ignore); } -static inline void -lnx_init_qpair(struct lnx_qpair *qp, dlist_func_t *recvq_match_func, - dlist_func_t *unexq_match_func) +static inline void lnx_init_qpair(struct lnx_qpair *qp, + dlist_func_t *recvq_match_func, + dlist_func_t *unexq_match_func) { dlist_init(&qp->lqp_recvq.lq_queue); dlist_init(&qp->lqp_unexq.lq_queue); @@ -563,9 +667,9 @@ lnx_init_qpair(struct lnx_qpair *qp, dlist_func_t *recvq_match_func, qp->lqp_unexq.lq_match_func = unexq_match_func; } -static int -lnx_alloc_endpoint(struct fid_domain *domain, struct fi_info *info, - struct lnx_ep **out_ep, void *context, size_t fclass) +static int lnx_alloc_endpoint(struct fid_domain *domain, struct fi_info *info, + struct lnx_ep **out_ep, void *context, + size_t fclass) { int rc; struct lnx_ep *lep; @@ -586,8 +690,9 @@ lnx_alloc_endpoint(struct fid_domain *domain, struct fi_info *info, lep->le_ep.ep_fid.rma = &lnx_rma_ops; lep->le_ep.ep_fid.atomic = &lnx_atomic_ops; lep->le_domain = container_of(domain, struct lnx_domain, - ld_domain.domain_fid); - lnx_init_qpair(&lep->le_srq.lps_trecv, lnx_match_recvq, lnx_match_unexq); + ld_domain.domain_fid); + lnx_init_qpair(&lep->le_srq.lps_trecv, lnx_match_recvq, + lnx_match_unexq); lnx_init_qpair(&lep->le_srq.lps_recv, lnx_match_recvq, lnx_match_unexq); ofi_genlock_lock(&lep->le_domain->ld_domain.lock); @@ -618,12 +723,12 @@ lnx_alloc_endpoint(struct fid_domain *domain, struct fi_info *info, goto fail; rc = lnx_open_core_eps(lep, context); - if (rc) + if (rc) goto fail; rc = ofi_endpoint_init(domain, (const struct util_prov *)&lnx_util_prov, - info, &lep->le_ep, - context, lnx_ep_nosys_progress); + info, &lep->le_ep, context, + lnx_ep_nosys_progress); if (rc) goto fail; @@ -642,39 +747,12 @@ int lnx_endpoint(struct fid_domain *domain, struct fi_info *info, { int rc; struct lnx_ep *my_ep; - char *mr_selection; - enum lnx_multirail_selection mr; - - rc = fi_param_get_str(&lnx_prov, "multi_rail_selection", - &mr_selection); - if (rc == -FI_ENOENT || rc == -FI_ENODATA) { - /* if the user makes no selection, pick the safest method - * which ensures send after send is satisfied - */ - mr = LNX_MR_SELECTION_PER_PEER; - goto create_ep; - } else if (rc) { - return rc; - } - if (strncasecmp(LNX_PER_MSG_SELECTION_STR, mr_selection, - strlen(mr_selection)) == 0) { - mr = LNX_MR_SELECTION_PER_MSG; - } else if (strncasecmp(LNX_PER_PEER_SELECTION_STR, mr_selection, - strlen(mr_selection)) == 0) { - mr = LNX_MR_SELECTION_PER_PEER; - } else { - FI_WARN(&lnx_prov, FI_LOG_CORE, - "Unknown multi-rail selection policy: %s\n", mr_selection); - return FI_EINVAL; - } - -create_ep: rc = lnx_alloc_endpoint(domain, info, &my_ep, context, FI_CLASS_EP); if (rc) return rc; - my_ep->le_mr = mr; + my_ep->le_mr = lnx_env.mr; ofi_atomic_initialize32(&my_ep->le_rr, 0); *ep = &my_ep->le_ep.ep_fid; @@ -682,4 +760,32 @@ int lnx_endpoint(struct fid_domain *domain, struct fi_info *info, return 0; } +struct fi_ops_rma lnx_rma_ops = { + .size = sizeof(struct fi_ops_rma), + .read = fi_no_rma_read, + .readv = fi_no_rma_readv, + .readmsg = fi_no_rma_readmsg, + .write = fi_no_rma_write, + .writev = fi_no_rma_writev, + .writemsg = fi_no_rma_writemsg, + .inject = fi_no_rma_inject, + .writedata = fi_no_rma_writedata, + .injectdata = fi_no_rma_injectdata, +}; +struct fi_ops_atomic lnx_atomic_ops = { + .size = sizeof(struct fi_ops_atomic), + .write = fi_no_atomic_write, + .writev = fi_no_atomic_writev, + .writemsg = fi_no_atomic_writemsg, + .inject = fi_no_atomic_inject, + .readwrite = fi_no_atomic_readwrite, + .readwritev = fi_no_atomic_readwritev, + .readwritemsg = fi_no_atomic_readwritemsg, + .compwrite = fi_no_atomic_compwrite, + .compwritev = fi_no_atomic_compwritev, + .compwritemsg = fi_no_atomic_compwritemsg, + .writevalid = fi_no_atomic_writevalid, + .readwritevalid = fi_no_atomic_readwritevalid, + .compwritevalid = fi_no_atomic_compwritevalid, +}; diff --git a/prov/lnx/src/lnx_init.c b/prov/lnx/src/lnx_init.c index e4831957213..a82554fe3dd 100644 --- a/prov/lnx/src/lnx_init.c +++ b/prov/lnx/src/lnx_init.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2022 ORNL. All rights reserved. + * Copyright (c) Intel Corporation. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU @@ -30,24 +31,9 @@ * SOFTWARE. */ -#include "config.h" - -#include -#include -#include -#include -#include -#include - -#include -#include "ofi_util.h" -#include "ofi.h" -#include "ofi_str.h" -#include "ofi_prov.h" -#include "ofi_perf.h" -#include "ofi_hmem.h" -#include "rdma/fi_ext.h" #include "lnx.h" +#include "ofi_prov.h" +#include #define LNX_PASSTHRU_TX_OP_FLAGS (FI_INJECT_COMPLETE | \ FI_TRANSMIT_COMPLETE | \ @@ -57,24 +43,34 @@ FI_DELIVERY_COMPLETE | FI_TRANSMIT_COMPLETE) #define LNX_RX_OP_FLAGS (FI_COMPLETION) +#define LNX_TX_CAPS (FI_TAGGED | FI_SEND | FI_HMEM) +#define LNX_RX_CAPS (FI_TAGGED | FI_RECV | FI_DIRECTED_RECV | \ + FI_HMEM) + ofi_spin_t global_bplock; struct ofi_bufpool *global_recv_bp = NULL; struct util_fabric lnx_fabric_info; +struct lnx_env lnx_env = { + .mr = LNX_MR_SELECTION_PER_PEER, + .prov_links = NULL, + .disable_shm = false, + .dump_stats = false, +}; struct fi_tx_attr lnx_tx_attr = { - .caps = ~0x0ULL, + .caps = LNX_TX_CAPS, .op_flags = LNX_PASSTHRU_TX_OP_FLAGS | LNX_TX_OP_FLAGS, .msg_order = ~0x0ULL, .comp_order = 0, .inject_size = SIZE_MAX, .size = SIZE_MAX, .iov_limit = LNX_IOV_LIMIT, - .rma_iov_limit = LNX_IOV_LIMIT, + .rma_iov_limit = LNX_IOV_LIMIT, }; struct fi_rx_attr lnx_rx_attr = { - .caps = ~0x0ULL, + .caps = LNX_RX_CAPS, .op_flags = LNX_PASSTHRU_RX_OP_FLAGS | LNX_RX_OP_FLAGS, .msg_order = ~0x0ULL, .comp_order = 0, @@ -92,9 +88,9 @@ struct fi_ep_attr lnx_ep_attr = { .max_order_raw_size = SIZE_MAX, .max_order_war_size = SIZE_MAX, .max_order_waw_size = SIZE_MAX, - .mem_tag_format = FI_TAG_GENERIC, - .tx_ctx_cnt = SIZE_MAX, - .rx_ctx_cnt = SIZE_MAX, + .mem_tag_format = FI_TAG_GENERIC, + .tx_ctx_cnt = 0, + .rx_ctx_cnt = 0, .auth_key = NULL, .auth_key_size = 0, }; @@ -106,17 +102,17 @@ struct fi_domain_attr lnx_domain_attr = { .data_progress = FI_PROGRESS_AUTO, .resource_mgmt = FI_RM_ENABLED, .av_type = FI_AV_TABLE, - .mr_mode = FI_MR_RAW, + .mr_mode = 0, .mr_key_size = SIZE_MAX, .cq_data_size = SIZE_MAX, .cq_cnt = SIZE_MAX, .ep_cnt = SIZE_MAX, - .tx_ctx_cnt = SIZE_MAX, - .rx_ctx_cnt = SIZE_MAX, - .max_ep_tx_ctx = SIZE_MAX, - .max_ep_rx_ctx = SIZE_MAX, - .max_ep_stx_ctx = SIZE_MAX, - .max_ep_srx_ctx = SIZE_MAX, + .tx_ctx_cnt = 0, + .rx_ctx_cnt = 0, + .max_ep_tx_ctx = 0, + .max_ep_rx_ctx = 0, + .max_ep_stx_ctx = 0, + .max_ep_srx_ctx = 0, .cntr_cnt = SIZE_MAX, .mr_iov_limit = SIZE_MAX, .caps = ~0x0ULL, @@ -131,7 +127,7 @@ struct fi_fabric_attr lnx_fabric_attr = { }; struct fi_info lnx_info = { - .caps = ~0x0ULL, + .caps = LNX_TX_CAPS | LNX_RX_CAPS, .tx_attr = &lnx_tx_attr, .rx_attr = &lnx_rx_attr, .ep_attr = &lnx_ep_attr, @@ -139,6 +135,32 @@ struct fi_info lnx_info = { .fabric_attr = &lnx_fabric_attr }; +static int lnx_fabric_close(struct fid *fid) +{ + int rc, i, frc = 0; + struct lnx_core_fabric *cf; + struct lnx_fabric *lnx_fab; + + lnx_fab = container_of(fid, struct lnx_fabric, + lf_util_fabric.fabric_fid.fid); + + for (i = 0; i < lnx_fab->lf_num_fabs; i++) { + cf = &lnx_fab->lf_core_fabrics[i]; + + rc = fi_close(&cf->cf_fabric->fid); + if (rc) + frc = rc; + } + + rc = ofi_fabric_close(&lnx_fab->lf_util_fabric); + if (rc) + frc = rc; + + free(lnx_fab->lf_core_fabrics); + + return frc; +} + static struct fi_ops lnx_fabric_fi_ops = { .size = sizeof(struct fi_ops), .close = lnx_fabric_close, @@ -162,7 +184,6 @@ static int lnx_wait_open(struct fid_fabric *fabric_fid, } } - static struct fi_ops_fabric lnx_fabric_ops = { .size = sizeof(struct fi_ops_fabric), .domain = lnx_domain_open, @@ -175,22 +196,6 @@ static struct fi_ops_fabric lnx_fabric_ops = { .trywait = fi_no_trywait }; -struct fi_provider lnx_prov = { - .name = OFI_LNX, - .version = OFI_VERSION_DEF_PROV, - .fi_version = OFI_VERSION_LATEST, - .getinfo = lnx_getinfo, - .fabric = lnx_fabric, - .cleanup = lnx_fini -}; - -struct util_prov lnx_util_prov = { - .prov = &lnx_prov, - .info = &lnx_info, - .flags = 0 -}; - -/* this is a list of all possible links */ struct dlist_entry lnx_links; struct lnx_fi_cache_entry { @@ -224,15 +229,14 @@ static void lnx_trim(char *str) str[i] = '\0'; } - -void lnx_free_links(struct dlist_entry *links) +static void lnx_free_links(struct dlist_entry *links) { struct lnx_link_info *link; struct lnx_fi_cache_entry *e; struct dlist_entry *tmp, *tmp2; - dlist_foreach_container_safe(links, struct lnx_link_info, link, - entry, tmp) { + dlist_foreach_container_safe(links, struct lnx_link_info, link, entry, + tmp) { dlist_foreach_container_safe(&link->link_providers, struct lnx_fi_cache_entry, e, entry, tmp2) { @@ -245,8 +249,8 @@ void lnx_free_links(struct dlist_entry *links) } } -static int lnx_cache_info(struct dlist_entry *head, - struct fi_info *info, bool new_prov) +static int lnx_cache_info(struct dlist_entry *head, struct fi_info *info, + bool new_prov) { struct lnx_fi_cache_entry *e; struct fi_info *itr; @@ -287,13 +291,11 @@ static int lnx_cache_info(struct dlist_entry *head, return 0; } -static struct lnx_link_info * -lnx_get_link_by_dom(char *domain_name) +static struct lnx_link_info *lnx_get_link_by_dom(char *domain_name) { struct lnx_link_info *link; - dlist_foreach_container(&lnx_links, struct lnx_link_info, link, - entry) { + dlist_foreach_container(&lnx_links, struct lnx_link_info, link, entry) { if (!strcmp(domain_name, link->fi_link->domain_attr->name)) return link; } @@ -301,12 +303,13 @@ lnx_get_link_by_dom(char *domain_name) return NULL; } -static int lnx_generate_link_info(struct fi_info **info, const struct fi_info *hints) +static int lnx_generate_link_info(uint32_t version, const struct fi_info *hints, + struct fi_info **info) { struct fi_info *itr, *fi = NULL, *next; struct lnx_link_info *link; struct lnx_fi_cache_entry *e; - size_t min_inject_size = SIZE_MAX, min_of_max_msg_size = SIZE_MAX; + size_t min_inject_size = SIZE_MAX, min_max_msg_size = SIZE_MAX; size_t min_rx_size = SIZE_MAX, min_tx_size = SIZE_MAX; size_t min_iov_limit = LNX_IOV_LIMIT; int mr_mode = 0; @@ -316,10 +319,9 @@ static int lnx_generate_link_info(struct fi_info **info, const struct fi_info *h char *link_name; char *prov_name; bool reset; - uint64_t caps = 0; + uint64_t tx_caps = 0, rx_caps = 0; - dlist_foreach_container(&lnx_links, struct lnx_link_info, link, - entry) { + dlist_foreach_container(&lnx_links, struct lnx_link_info, link, entry) { link_name = calloc(sizeof(char), str_len); if (!link_name) return -FI_ENOMEM; @@ -349,16 +351,19 @@ static int lnx_generate_link_info(struct fi_info **info, const struct fi_info *h mr_mode |= e->fi->domain_attr->mr_mode; if (reset) { - caps = e->fi->caps; - min_inject_size = min_of_max_msg_size = SIZE_MAX; + tx_caps = LNX_TX_CAPS & e->fi->tx_attr->caps; + rx_caps = LNX_RX_CAPS & e->fi->rx_attr->caps; + min_inject_size = min_max_msg_size = SIZE_MAX; min_rx_size = min_tx_size = SIZE_MAX; min_iov_limit = LNX_IOV_LIMIT; reset = false; } else { - caps &= e->fi->caps; + tx_caps &= e->fi->tx_attr->caps; + rx_caps &= e->fi->rx_attr->caps; } - if (e->fi->ep_attr->max_msg_size < min_of_max_msg_size) - min_of_max_msg_size = e->fi->ep_attr->max_msg_size; + if (e->fi->ep_attr->max_msg_size < min_max_msg_size) + min_max_msg_size = + e->fi->ep_attr->max_msg_size; if (e->fi->tx_attr->inject_size < min_inject_size) min_inject_size = e->fi->tx_attr->inject_size; if (e->fi->tx_attr->iov_limit < min_iov_limit) @@ -375,7 +380,8 @@ static int lnx_generate_link_info(struct fi_info **info, const struct fi_info *h tmp[0] = '+'; tmp++; } - memcpy(tmp, itr->domain_attr->name, len); + memcpy(tmp, itr->domain_attr->name, + len); tmp += (len - 1); } else { str_len = str_len * 2 + len; @@ -389,13 +395,14 @@ static int lnx_generate_link_info(struct fi_info **info, const struct fi_info *h tmp2 = link_name + str_len - 1; tmp[0] = '+'; tmp++; - memcpy(tmp, itr->domain_attr->name, len); + memcpy(tmp, itr->domain_attr->name, + len); tmp += (len - 1); } } } - link_name = realloc(link_name, strlen(link_name)+1); + link_name = realloc(link_name, strlen(link_name) + 1); free(link->fi_link->fabric_attr->prov_name); free(link->fi_link->fabric_attr->name); free(link->fi_link->domain_attr->name); @@ -410,17 +417,26 @@ static int lnx_generate_link_info(struct fi_info **info, const struct fi_info *h next = fi_dupinfo(link->fi_link); next->ep_attr->protocol = FI_PROTO_LNX; next->domain_attr->mr_mode = mr_mode; - next->domain_attr->rx_ctx_cnt = lnx_util_prov.info->domain_attr->rx_ctx_cnt; - next->domain_attr->tx_ctx_cnt = lnx_util_prov.info->domain_attr->tx_ctx_cnt; - next->ep_attr->rx_ctx_cnt = lnx_util_prov.info->ep_attr->rx_ctx_cnt; - next->ep_attr->tx_ctx_cnt = lnx_util_prov.info->ep_attr->tx_ctx_cnt; - if (hints) - next->caps = caps & hints->caps; - else - next->caps = caps; + next->domain_attr->rx_ctx_cnt = + lnx_util_prov.info->domain_attr->rx_ctx_cnt; + next->domain_attr->tx_ctx_cnt = + lnx_util_prov.info->domain_attr->tx_ctx_cnt; + next->ep_attr->rx_ctx_cnt = + lnx_util_prov.info->ep_attr->rx_ctx_cnt; + next->ep_attr->tx_ctx_cnt = + lnx_util_prov.info->ep_attr->tx_ctx_cnt; + + if (!hints || !hints->caps) { + next->tx_attr->caps = tx_caps; + next->rx_attr->caps = rx_caps; + } else { + next->tx_attr->caps = (tx_caps & hints->caps); + next->rx_attr->caps = (rx_caps & hints->caps); + } + next->caps = next->tx_attr->caps | next->rx_attr->caps; next->tx_attr->inject_size = min_inject_size; next->tx_attr->iov_limit = min_iov_limit; - next->ep_attr->max_msg_size = min_of_max_msg_size; + next->ep_attr->max_msg_size = min_max_msg_size; next->rx_attr->size = min_rx_size; next->tx_attr->size = min_tx_size; @@ -441,8 +457,7 @@ static int lnx_generate_link_info(struct fi_info **info, const struct fi_info *h return FI_SUCCESS; } -static struct fi_info * -lnx_filter_info(struct fi_info *fi, char *domain) +static struct fi_info *lnx_filter_info(struct fi_info *fi, char *domain) { struct fi_info *itr, *tmp, *filtered_head = NULL, *filtered_tail = NULL; @@ -478,9 +493,9 @@ lnx_filter_info(struct fi_info *fi, char *domain) return (filtered_head) ? filtered_head : fi; } -int lnx_getinfo_helper(uint32_t version, char *prov, char *domain, - struct fi_info *lnx_hints, bool new_prov, - struct dlist_entry *link) +static int lnx_getinfo_helper(uint32_t version, char *prov, char *domain, + struct fi_info *lnx_hints, bool new_prov, + struct dlist_entry *link) { int rc; char *orig_prov_name = NULL, *orig_dom_name = NULL; @@ -503,8 +518,8 @@ int lnx_getinfo_helper(uint32_t version, char *prov, char *domain, lnx_hints->domain_attr->caps &= ~(FI_REMOTE_COMM); } - rc = fi_getinfo(version, NULL, NULL, OFI_GETINFO_HIDDEN, - lnx_hints, &core_info); + rc = fi_getinfo(version, NULL, NULL, OFI_GETINFO_HIDDEN, lnx_hints, + &core_info); lnx_hints->fabric_attr->prov_name = orig_prov_name; lnx_hints->domain_attr->name = orig_dom_name; @@ -521,15 +536,57 @@ int lnx_getinfo_helper(uint32_t version, char *prov, char *domain, return rc; } -int lnx_getinfo(uint32_t version, const char *node, const char *service, - uint64_t flags, const struct fi_info *hints, - struct fi_info **info) +static int lnx_init_env(void) +{ + char *mr_selection; + int rc; + + rc = fi_param_get_str(&lnx_prov, "prov_links", &lnx_env.prov_links); + if (rc) { + FI_WARN(&lnx_prov, FI_LOG_FABRIC, + "Error getting prov_links environment variable " + "lnx required FI_LNX_PROV_LINKS to determine links\n"); + return -FI_EINVAL; + } + + fi_param_get_bool(&lnx_prov, "disable_shm", &lnx_env.disable_shm); + + rc = fi_param_get_str(&lnx_prov, "multi_rail_selection", + &mr_selection); + if (rc == -FI_ENOENT || rc == -FI_ENODATA) { + /* if the user makes no selection, pick the safest method + * which ensures send after send is satisfied + */ + lnx_env.mr = LNX_MR_SELECTION_PER_PEER; + } else if (!rc) { + if (strncasecmp("PER_MSG", mr_selection, strlen(mr_selection)) == 0) { + lnx_env.mr = LNX_MR_SELECTION_PER_MSG; + } else if (strncasecmp("PER_PEER", mr_selection, + strlen(mr_selection)) == 0) { + lnx_env.mr = LNX_MR_SELECTION_PER_PEER; + } else { + FI_WARN(&lnx_prov, FI_LOG_CORE, + "Unknown multi-rail selection policy: %s\n", + mr_selection); + return FI_EINVAL; + } + } else { + return rc; + } + + fi_param_get_bool(&lnx_prov, "dump_stats", &lnx_env.dump_stats); + + return 0; +} + +static int lnx_getinfo(uint32_t version, const char *node, const char *service, + uint64_t flags, const struct fi_info *hints, + struct fi_info **info) { int rc; bool new_prov; struct fi_info *lnx_hints = NULL; struct lnx_link_info *link; - char *linked_provs, *linked_provs_cp; char *save_ptr0, *save_ptr1, *save_ptr2, *provider_block, *provider, *domain, *link_block; @@ -540,35 +597,32 @@ int lnx_getinfo(uint32_t version, const char *node, const char *service, if (!dlist_empty(&lnx_links)) goto generate_info; - rc = fi_param_get_str(&lnx_prov, "prov_links", - &linked_provs); + rc = lnx_init_env(); if (rc) return rc; - if (strstr(linked_provs, "lnx")) { + if (strstr(lnx_env.prov_links, "lnx")) { FI_WARN(&lnx_prov, FI_LOG_FABRIC, - "Can't specify the lnx provider as part of the link: %s\n", - linked_provs); + "Can't specify the lnx provider as part of the link: " + "%s\n", lnx_env.prov_links); return -FI_EINVAL; } - - linked_provs_cp = strdup(linked_provs); - if (!linked_provs_cp) - return -FI_ENOMEM; + rc = ofi_check_info(&lnx_util_prov, &lnx_info, version, hints); + if (rc) + return rc; /* If the hints are not provided then we endup with a new block */ lnx_hints = fi_dupinfo(hints); if (!lnx_hints) return -FI_ENOMEM; - rc = ofi_exclude_prov_name(&lnx_hints->fabric_attr->prov_name, lnx_prov.name); + lnx_hints->caps |= FI_PEER | FI_AV_USER_ID; + + rc = ofi_exclude_prov_name(&lnx_hints->fabric_attr->prov_name, + lnx_prov.name); if (rc) return rc; - /* get the providers which support peer functionality. These are - * the only ones we can link*/ - lnx_hints->caps |= FI_PEER; - /* Format: * '+' appends another provider to the link. * ':' identifies the start of the domains to include. If not @@ -591,7 +645,7 @@ int lnx_getinfo(uint32_t version, const char *node, const char *service, * domains * ex: cxi:cxi0,cxi1 */ - link_block = strtok_r(linked_provs_cp, "|", &save_ptr0); + link_block = strtok_r(lnx_env.prov_links, "|", &save_ptr0); while (link_block) { lnx_trim(link_block); @@ -614,9 +668,10 @@ int lnx_getinfo(uint32_t version, const char *node, const char *service, domain = strtok_r(NULL, ",", &save_ptr2); do { lnx_trim(domain); - rc = lnx_getinfo_helper(version, provider, domain, - lnx_hints, new_prov, - &link->link_providers); + rc = lnx_getinfo_helper(version, provider, + domain, lnx_hints, + new_prov, + &link->link_providers); if (rc) return rc; new_prov = false; @@ -626,19 +681,17 @@ int lnx_getinfo(uint32_t version, const char *node, const char *service, dlist_insert_tail(&link->entry, &lnx_links); link_block = strtok_r(NULL, "|", &save_ptr0); } - free(linked_provs_cp); generate_info: - rc = lnx_generate_link_info(info, hints); + rc = lnx_generate_link_info(version, hints, info); if (lnx_hints) fi_freeinfo(lnx_hints); return rc; } -static int -lnx_setup_core_fab(struct lnx_fabric *lnx_fab, struct fi_info *info, - void *context, int i) +static int lnx_setup_core_fab(struct lnx_fabric *lnx_fab, struct fi_info *info, + void *context, int i) { int rc = -FI_ENOMEM; struct lnx_core_fabric *cf; @@ -658,9 +711,7 @@ lnx_setup_core_fab(struct lnx_fabric *lnx_fab, struct fi_info *info, return rc; } -int -lnx_setup_fabrics(char *name, struct lnx_fabric *lnx_fab, - void *context) +int lnx_setup_fabrics(char *name, struct lnx_fabric *lnx_fab, void *context) { int rc; struct lnx_link_info *link; @@ -697,8 +748,8 @@ lnx_setup_fabrics(char *name, struct lnx_fabric *lnx_fab, return FI_SUCCESS; } -int lnx_fabric(struct fi_fabric_attr *attr, struct fid_fabric **fabric, - void *context) +static int lnx_fabric(struct fi_fabric_attr *attr, struct fid_fabric **fabric, + void *context) { struct lnx_fabric *lnx_fab; int rc; @@ -708,8 +759,8 @@ int lnx_fabric(struct fi_fabric_attr *attr, struct fid_fabric **fabric, return -FI_ENOMEM; rc = ofi_fabric_init(&lnx_prov, lnx_info.fabric_attr, - lnx_info.fabric_attr, - &lnx_fab->lf_util_fabric, context); + lnx_info.fabric_attr, &lnx_fab->lf_util_fabric, + context); if (rc) goto fail; @@ -723,42 +774,32 @@ int lnx_fabric(struct fi_fabric_attr *attr, struct fid_fabric **fabric, return rc; } -void lnx_fini(void) +static void lnx_fini(void) { lnx_free_links(&lnx_links); ofi_bufpool_destroy(global_recv_bp); } -int lnx_fabric_close(struct fid *fid) -{ - int rc, i, frc = 0; - struct lnx_core_fabric *cf; - struct lnx_fabric *lnx_fab; - - lnx_fab = container_of(fid, struct lnx_fabric, lf_util_fabric.fabric_fid.fid); - - for (i = 0; i < lnx_fab->lf_num_fabs; i++) { - cf = &lnx_fab->lf_core_fabrics[i]; - - rc = fi_close(&cf->cf_fabric->fid); - if (rc) - frc = rc; - } - - rc = ofi_fabric_close(&lnx_fab->lf_util_fabric); - if (rc) - frc = rc; - - free(lnx_fab->lf_core_fabrics); - - return frc; -} - void ofi_link_fini(void) { lnx_prov.cleanup(); } +struct fi_provider lnx_prov = { + .name = OFI_LNX, + .version = OFI_VERSION_DEF_PROV, + .fi_version = OFI_VERSION_LATEST, + .getinfo = lnx_getinfo, + .fabric = lnx_fabric, + .cleanup = lnx_fini +}; + +struct util_prov lnx_util_prov = { + .prov = &lnx_prov, + .info = &lnx_info, + .flags = 0 +}; + LNX_INI { struct ofi_bufpool_attr bp_attrs = {}; diff --git a/prov/lnx/src/lnx_mr.c b/prov/lnx/src/lnx_mr.c index 41ed63a8022..9450e4a714e 100644 --- a/prov/lnx/src/lnx_mr.c +++ b/prov/lnx/src/lnx_mr.c @@ -1,5 +1,6 @@ /* * Copyright (c) 2025 ORNL. All rights reserved. + * Copyright (c) Intel Corporation. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU @@ -30,23 +31,6 @@ * SOFTWARE. */ -#include "config.h" - -#include -#include -#include -#include -#include -#include - -#include -#include "ofi_util.h" -#include "ofi.h" -#include "ofi_str.h" -#include "ofi_prov.h" -#include "ofi_perf.h" -#include "ofi_hmem.h" -#include "rdma/fi_ext.h" #include "lnx.h" /* @@ -58,7 +42,8 @@ * target core provider we can do memory registration at that point */ -int lnx_mr_regattr_core(struct lnx_core_domain *cd, void *desc, void **core_desc) +int lnx_mr_regattr_core(struct lnx_core_domain *cd, void *desc, + void **core_desc) { int rc; struct lnx_mr *lm; @@ -135,7 +120,8 @@ int lnx_mr_regattr(struct fid *fid, const struct fi_mr_attr *attr, memset(lm, 0, sizeof(*lm)); lm->lm_attr = *attr; - memcpy(lm->lm_iov, attr->mr_iov, sizeof(struct iovec) * attr->iov_count); + memcpy(lm->lm_iov, attr->mr_iov, + sizeof(struct iovec) * attr->iov_count); lm->lm_attr.mr_iov = lm->lm_iov; mr = &lm->lm_mr; mr->mr_fid.fid.fclass = FI_CLASS_MR; @@ -149,5 +135,3 @@ int lnx_mr_regattr(struct fid *fid, const struct fi_mr_attr *attr, return FI_SUCCESS; } - - diff --git a/prov/lnx/src/lnx_msg.c b/prov/lnx/src/lnx_msg.c new file mode 100644 index 00000000000..8fa818cf8f1 --- /dev/null +++ b/prov/lnx/src/lnx_msg.c @@ -0,0 +1,338 @@ +/* + * Copyright (c) 2022 ORNL. All rights reserved. + * Copyright (c) Intel Corporation. All rights reserved. + * + * This software is available to you under a choice of one of two + * licenses. You may choose to be licensed under the terms of the GNU + * General Public License (GPL) Version 2, available from the file + * COPYING in the main directory of this source tree, or the + * BSD license below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include "lnx.h" + +ssize_t lnx_trecv(struct fid_ep *ep, void *buf, size_t len, void *desc, + fi_addr_t src_addr, uint64_t tag, uint64_t ignore, + void *context) +{ + struct lnx_ep *lep; + const struct iovec iov = {.iov_base = buf, .iov_len = len}; + + lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + return lnx_process_recv(lep, &iov, desc, src_addr, 1, tag, ignore, + context, lnx_ep_rx_flags(lep), true); +} + +ssize_t lnx_trecvv(struct fid_ep *ep, const struct iovec *iov, void **desc, + size_t count, fi_addr_t src_addr, uint64_t tag, + uint64_t ignore, void *context) +{ + void *mr_desc; + struct lnx_ep *lep; + + lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + + if (count == 0) { + mr_desc = NULL; + } else if (iov && count >= 1 && + count <= lep->le_domain->ld_iov_limit) { + mr_desc = desc ? desc[0] : NULL; + } else { + FI_WARN(&lnx_prov, FI_LOG_CORE, "Invalid IOV\n"); + return -FI_EINVAL; + } + + return lnx_process_recv(lep, iov, mr_desc, src_addr, count, tag, ignore, + context, lnx_ep_rx_flags(lep), true); +} + +ssize_t lnx_trecvmsg(struct fid_ep *ep, const struct fi_msg_tagged *msg, + uint64_t flags) +{ + void *mr_desc; + struct lnx_ep *lep; + + lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + if (msg->iov_count == 0) { + mr_desc = NULL; + } else if (msg->msg_iov && msg->iov_count >= 1 && + msg->iov_count <= lep->le_domain->ld_iov_limit) { + mr_desc = msg->desc ? msg->desc[0] : NULL; + } else { + FI_WARN(&lnx_prov, FI_LOG_CORE, "Invalid IOV\n"); + return -FI_EINVAL; + } + + return lnx_process_recv(lep, msg->msg_iov, mr_desc, msg->addr, + msg->iov_count, msg->tag, msg->ignore, + msg->context, flags | lep->le_ep.rx_msg_flags, + true); +} + +ssize_t lnx_tsend(struct fid_ep *ep, const void *buf, size_t len, void *desc, + fi_addr_t dest_addr, uint64_t tag, void *context) +{ + int rc; + struct lnx_ep *lep; + void *core_desc = NULL; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + + lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, + &core_addr); + if (rc) + return rc; + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "sending to %" PRIx64 " tag %" PRIx64 " buf %p len %zu\n", + core_addr, tag, buf, len); + + if (desc) { + rc = lnx_mr_regattr_core(cep->cep_domain, desc, &core_desc); + if (rc) + return rc; + } + + rc = fi_tsend(cep->cep_ep, buf, len, core_desc, core_addr, tag, + context); + + if (!rc) + cep->cep_t_stats.st_num_tsend++; + + return rc; +} + +ssize_t lnx_tsendv(struct fid_ep *ep, const struct iovec *iov, void **desc, + size_t count, fi_addr_t dest_addr, uint64_t tag, + void *context) +{ + int rc; + struct lnx_ep *lep; + void *core_desc = NULL; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + + lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, + &core_addr); + if (rc) + return rc; + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "sending to %" PRIx64 " tag %" PRIx64 " buf %p len %zu\n", + core_addr, tag, iov->iov_base, iov->iov_len); + + if (desc && *desc) { + rc = lnx_mr_regattr_core(cep->cep_domain, *desc, &core_desc); + if (rc) + return rc; + } + + rc = fi_tsendv(cep->cep_ep, iov, (core_desc) ? &core_desc : NULL, + count, core_addr, tag, context); + + if (!rc) + cep->cep_t_stats.st_num_tsendv++; + + return rc; +} + +ssize_t lnx_tsendmsg(struct fid_ep *ep, const struct fi_msg_tagged *msg, + uint64_t flags) +{ + int rc; + struct lnx_ep *lep; + void *core_desc = NULL; + struct lnx_core_ep *cep; + struct fi_msg_tagged core_msg; + + memcpy(&core_msg, msg, sizeof(*msg)); + + lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + rc = lnx_select_send_endpoints[lep->le_mr](lep, core_msg.addr, &cep, + &core_msg.addr); + if (rc) + return rc; + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "sending to %" PRIx64 " tag %" PRIx64 "\n", + core_msg.addr, core_msg.tag); + + if (core_msg.desc && *core_msg.desc) { + rc = lnx_mr_regattr_core(cep->cep_domain, *core_msg.desc, + &core_desc); + if (rc) + return rc; + core_msg.desc = &core_desc; + } + + rc = fi_tsendmsg(cep->cep_ep, &core_msg, flags); + + if (!rc) + cep->cep_t_stats.st_num_tsendmsg++; + + return rc; +} + +ssize_t lnx_tinject(struct fid_ep *ep, const void *buf, size_t len, + fi_addr_t dest_addr, uint64_t tag) +{ + int rc; + struct lnx_ep *lep; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + + lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, + &core_addr); + if (rc) + return rc; + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "sending to %" PRIx64 " tag %" PRIx64 " buf %p len %zu\n", + core_addr, tag, buf, len); + + rc = fi_tinject(cep->cep_ep, buf, len, core_addr, tag); + + if (!rc) + cep->cep_t_stats.st_num_tinject++; + + return rc; +} + +ssize_t lnx_tsenddata(struct fid_ep *ep, const void *buf, size_t len, + void *desc, uint64_t data, fi_addr_t dest_addr, + uint64_t tag, void *context) +{ + int rc; + struct lnx_ep *lep; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + void *core_desc = desc; + + lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, + &core_addr); + if (rc) + return rc; + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "sending to %" PRIx64 " tag %" PRIx64 " buf %p len %zu\n", + core_addr, tag, buf, len); + + if (desc) { + rc = lnx_mr_regattr_core(cep->cep_domain, desc, &core_desc); + if (rc) + return rc; + } + + rc = fi_tsenddata(cep->cep_ep, buf, len, core_desc, + data, core_addr, tag, context); + + if (!rc) + cep->cep_t_stats.st_num_tsenddata++; + + return rc; +} + +ssize_t lnx_tinjectdata(struct fid_ep *ep, const void *buf, size_t len, + uint64_t data, fi_addr_t dest_addr, uint64_t tag) +{ + int rc; + struct lnx_ep *lep; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + + lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, + &core_addr); + if (rc) + return rc; + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "sending to %" PRIx64 " tag %" PRIx64 " buf %p len %zu\n", + core_addr, tag, buf, len); + + rc = fi_tinjectdata(cep->cep_ep, buf, len, data, core_addr, tag); + + if (!rc) + cep->cep_t_stats.st_num_tinjectdata++; + + return rc; +} + +struct fi_ops_tagged lnx_tagged_ops = { + .size = sizeof(struct fi_ops_tagged), + .recv = lnx_trecv, + .recvv = lnx_trecvv, + .recvmsg = lnx_trecvmsg, + .send = lnx_tsend, + .sendv = lnx_tsendv, + .sendmsg = lnx_tsendmsg, + .inject = lnx_tinject, + .senddata = lnx_tsenddata, + .injectdata = lnx_tinjectdata, +}; + +struct fi_ops_msg lnx_msg_ops = { + .size = sizeof(struct fi_ops_msg), + .recv = fi_no_msg_recv, + .recvv = fi_no_msg_recvv, + .recvmsg = fi_no_msg_recvmsg, + .send = fi_no_msg_send, + .sendv = fi_no_msg_sendv, + .sendmsg = fi_no_msg_sendmsg, + .inject = fi_no_msg_inject, + .senddata = fi_no_msg_senddata, + .injectdata = fi_no_msg_injectdata, +}; diff --git a/prov/lnx/src/lnx_ops.c b/prov/lnx/src/lnx_ops.c deleted file mode 100644 index dd7b5dbbeca..00000000000 --- a/prov/lnx/src/lnx_ops.c +++ /dev/null @@ -1,800 +0,0 @@ -/* - * Copyright (c) 2022 ORNL. All rights reserved. - * - * This software is available to you under a choice of one of two - * licenses. You may choose to be licensed under the terms of the GNU - * General Public License (GPL) Version 2, available from the file - * COPYING in the main directory of this source tree, or the - * BSD license below: - * - * Redistribution and use in source and binary forms, with or - * without modification, are permitted provided that the following - * conditions are met: - * - * - Redistributions of source code must retain the above - * copyright notice, this list of conditions and the following - * disclaimer. - * - * - Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following - * disclaimer in the documentation and/or other materials - * provided with the distribution. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, - * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF - * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND - * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS - * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN - * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ - -#include "config.h" - -#include -#include -#include -#include -#include -#include -#include - -#include -#include "ofi_util.h" -#include "ofi.h" -#include "ofi_str.h" -#include "ofi_prov.h" -#include "ofi_perf.h" -#include "ofi_hmem.h" -#include "ofi_lock.h" -#include "rdma/fi_ext.h" -#include "ofi_iov.h" -#include "lnx.h" - -static int (*lnx_select_send_endpoints[LNX_MR_SELECTION_MAX]) - (struct lnx_ep *lep, fi_addr_t lnx_addr, - struct lnx_core_ep **cep_out, fi_addr_t *core_addr) = -{ - [LNX_MR_SELECTION_PER_MSG] = lnx_select_send_endpoints_msg, - [LNX_MR_SELECTION_PER_PEER] = lnx_select_send_endpoints_peer, -}; - -static void (*lnx_set_send_pair[LNX_MR_SELECTION_MAX]) - (struct lnx_ep *lep, struct lnx_core_ep *cep, fi_addr_t addr) = -{ - [LNX_MR_SELECTION_PER_MSG] = lnx_set_send_pair_noop, - [LNX_MR_SELECTION_PER_PEER] = lnx_set_send_pair_peer, -}; - -int lnx_get_msg(struct fid_peer_srx *srx, struct fi_peer_match_attr *match, - struct fi_peer_rx_entry **entry) -{ - return -FI_ENOSYS; -} - -int lnx_queue_msg(struct fi_peer_rx_entry *entry) -{ - return -FI_ENOSYS; -} - -void lnx_free_entry(struct fi_peer_rx_entry *entry) -{ - struct lnx_rx_entry *rx_entry; - ofi_spin_t *bplock; - - rx_entry = container_of(entry, struct lnx_rx_entry, rx_entry); - - if (rx_entry->rx_global) - bplock = &global_bplock; - else - bplock = &rx_entry->rx_lep->le_bplock; - - ofi_spin_lock(bplock); - ofi_buf_free(rx_entry); - ofi_spin_unlock(bplock); -} - -static void -lnx_init_rx_entry(struct lnx_rx_entry *entry, const struct iovec *iov, - void **desc, size_t count, fi_addr_t addr, uint64_t tag, - uint64_t ignore, void *context, uint64_t flags) -{ - if (iov) - memcpy(&entry->rx_iov, iov, sizeof(*iov) * count); - if (desc) - memcpy(entry->rx_desc, desc, sizeof(*desc) * count); - - entry->rx_entry.iov = entry->rx_iov; - entry->rx_entry.desc = entry->rx_desc; - entry->rx_entry.count = count; - entry->rx_entry.addr = addr; - entry->rx_entry.context = context; - entry->rx_entry.tag = tag; - entry->rx_entry.flags = flags | FI_TAGGED | FI_RECV; - entry->rx_ignore = ignore; -} - -static struct lnx_rx_entry * -get_rx_entry(struct lnx_ep *lep, const struct iovec *iov, - void **desc, size_t count, fi_addr_t addr, - uint64_t tag, uint64_t ignore, void *context, - uint64_t flags) -{ - struct lnx_rx_entry *rx_entry = NULL; - ofi_spin_t *bplock; - struct ofi_bufpool *bp; - - /* if lp is NULL, then we don't know where the message is going to - * come from, so allocate the rx_entry from a global pool - */ - if (!lep) { - bp = global_recv_bp; - bplock = &global_bplock; - } else { - bp = lep->le_recv_bp; - bplock = &lep->le_bplock; - } - - ofi_spin_lock(bplock); - rx_entry = (struct lnx_rx_entry *)ofi_buf_alloc(bp); - ofi_spin_unlock(bplock); - if (rx_entry) { - memset(rx_entry, 0, sizeof(*rx_entry)); - if (!lep) - rx_entry->rx_global = true; - rx_entry->rx_lep = lep; - lnx_init_rx_entry(rx_entry, iov, desc, count, addr, tag, - ignore, context, flags); - } - - return rx_entry; -} - -static inline struct lnx_rx_entry * -lnx_find_first_match(struct lnx_queue *q, struct lnx_match_attr *match) -{ - struct lnx_rx_entry *rx_entry; - - rx_entry = (struct lnx_rx_entry *) dlist_find_first_match( - &q->lq_queue, q->lq_match_func, match); - - return rx_entry; -} - -static inline void lnx_update_queue_stats(struct lnx_queue *q, bool dq) -{ - if (dq) - q->lq_size--; - else - q->lq_size++; - - if (q->lq_size > q->lq_max) - q->lq_max = q->lq_size; - - q->lq_rolling_sum += q->lq_size; - q->lq_count++; - q->lq_rolling_avg = q->lq_rolling_sum / q->lq_count; -} - -static inline struct lnx_rx_entry * -lnx_remove_first_match(struct lnx_queue *q, struct lnx_match_attr *match) -{ - struct lnx_rx_entry *rx_entry; - - rx_entry = (struct lnx_rx_entry *) dlist_remove_first_match( - &q->lq_queue, q->lq_match_func, match); - if (rx_entry) - lnx_update_queue_stats(q, true); - - return rx_entry; -} - -static inline void -lnx_insert_rx_entry(struct lnx_queue *q, struct lnx_rx_entry *entry) -{ - dlist_insert_tail(&entry->entry, &q->lq_queue); - lnx_update_queue_stats(q, false); -} - -int lnx_queue_tag(struct fi_peer_rx_entry *entry) -{ - struct lnx_rx_entry *rx_entry; - struct lnx_peer_srq *lnx_srq = (struct lnx_peer_srq*)entry->owner_context; - - rx_entry = container_of(entry, struct lnx_rx_entry, rx_entry); - FI_DBG(&lnx_prov, FI_LOG_CORE, - "addr = %" PRIx64 " tag = %" PRIx64 " ignore = 0 found\n", - entry->addr, entry->tag); - - lnx_insert_rx_entry(&lnx_srq->lps_trecv.lqp_unexq, rx_entry); - - return 0; -} - -int lnx_get_tag(struct fid_peer_srx *srx, struct fi_peer_match_attr *match, - struct fi_peer_rx_entry **entry) -{ - struct lnx_match_attr match_attr = {0}; - struct lnx_peer_srq *lnx_srq; - struct lnx_core_ep *cep; - struct lnx_ep *lep; - struct lnx_rx_entry *rx_entry; - fi_addr_t addr = match->addr; - uint64_t tag = match->tag; - int rc = 0; - - cep = srx->ep_fid.fid.context; - lep = cep->cep_parent; - lnx_srq = &lep->le_srq; - - match_attr.lm_addr = addr; - match_attr.lm_tag = tag; - - rx_entry = lnx_remove_first_match(&lnx_srq->lps_trecv.lqp_recvq, - &match_attr); - if (rx_entry) { - FI_DBG(&lnx_prov, FI_LOG_CORE, - "addr = %" PRIx64 " tag = %" PRIx64 " ignore = 0 found\n", - match_attr.lm_addr, tag); - - goto assign; - } - - FI_DBG(&lnx_prov, FI_LOG_CORE, - "addr = %" PRIx64 " tag = %" PRIx64 " ignore = 0 not found\n", - match_attr.lm_addr, tag); - - rx_entry = get_rx_entry(lep, NULL, NULL, 0, addr, tag, 0, NULL, - FI_TAGGED | FI_RECV); - if (!rx_entry) { - rc = -FI_ENOMEM; - goto out; - } - - rx_entry->rx_entry.owner_context = lnx_srq; - rx_entry->rx_cep = cep; - - rc = -FI_ENOENT; - - cep->cep_t_stats.st_num_unexp_msgs++; - - goto finalize; - -assign: - lnx_set_send_pair[lep->le_mr](lep, cep, addr); - - cep->cep_t_stats.st_num_posted_recvs++; - - rx_entry->rx_entry.addr = lnx_get_core_addr(cep, addr); - if (rx_entry->rx_entry.desc && *rx_entry->rx_entry.desc) { - rc = lnx_mr_regattr_core(cep->cep_domain, - *rx_entry->rx_entry.desc, - rx_entry->rx_entry.desc); - if (rc) - return rc; - } -finalize: - rx_entry->rx_entry.msg_size = match->msg_size; - *entry = &rx_entry->rx_entry; - -out: - return rc; -} - -static int -lnx_discard(struct lnx_ep *lep, struct lnx_rx_entry *rx_entry, void *context) -{ - struct lnx_core_ep *cep = rx_entry->rx_cep; - int rc; - - rc = cep->cep_srx.peer_ops->discard_tag(&rx_entry->rx_entry); - if (rc) { - FI_WARN(&lnx_prov, FI_LOG_CORE, - "Error discarding message from %s\n", - cep->cep_domain->cd_info->fabric_attr->name); - } - - rc = ofi_cq_write(lep->le_ep.rx_cq, context, - rx_entry->rx_entry.flags, - rx_entry->rx_entry.msg_size, NULL, - rx_entry->rx_entry.cq_data, - rx_entry->rx_entry.tag); - - dlist_remove(&rx_entry->entry); - lnx_free_entry(&rx_entry->rx_entry); - - return rc; -} - -static int -lnx_peek(struct lnx_ep *lep, struct lnx_match_attr *match_attr, void *context, - uint64_t flags) -{ - int rc; - struct lnx_rx_entry *rx_entry; - struct lnx_peer_srq *lnx_srq = &lep->le_srq; - - rx_entry = lnx_find_first_match(&lnx_srq->lps_trecv.lqp_unexq, - match_attr); - if (!rx_entry) { - FI_DBG(&lnx_prov, FI_LOG_CORE, - "PEEK addr=%" PRIx64 " tag=%" PRIx64 " ignore=%" PRIx64 "\n", - match_attr->lm_addr, match_attr->lm_tag, - match_attr->lm_ignore); - return ofi_cq_write_error_peek(lep->le_ep.rx_cq, - match_attr->lm_tag, context); - } - - rc = ofi_cq_write(lep->le_ep.rx_cq, context, - rx_entry->rx_entry.flags, - rx_entry->rx_entry.msg_size, NULL, - rx_entry->rx_entry.cq_data, - rx_entry->rx_entry.tag); - - if (flags & FI_DISCARD) { - rc = rx_entry->rx_cep->cep_srx.peer_ops->discard_tag( - &rx_entry->rx_entry); - dlist_remove(&rx_entry->entry); - lnx_free_entry(&rx_entry->rx_entry); - goto out; - } - - if (flags & FI_CLAIM) { - dlist_remove(&rx_entry->entry); - ((struct fi_context *)context)->internal[0] = rx_entry; - } - -out: - return rc; -} - -/* - * if lp is NULL, then we're attempting to receive from any peer so - * matching the tag is the only thing that matters. - * - * if lp != NULL, then we're attempting to receive from a particular - * peer. This peer can have multiple endpoints serviced by different core - * providers. - * - * Therefore when we check the unexpected queue, we need to check - * if we received any messages from any of the peer's addresses. If we - * find one, then we kick the core provider associated with that - * address to receive the message. - * - * If nothing is found on the unexpected messages, then add a receive - * request on the SRQ; happens in the lnx_process_recv() - */ -static int lnx_process_recv(struct lnx_ep *lep, const struct iovec *iov, void *desc, - fi_addr_t addr, size_t count, uint64_t tag, - uint64_t ignore, void *context, uint64_t flags, - bool tagged) -{ - struct lnx_peer_srq *lnx_srq = &lep->le_srq; - struct lnx_rx_entry *rx_entry; - struct lnx_match_attr match_attr; - struct lnx_core_ep *cep; - int rc = 0; - fi_addr_t sub_addr, encoded_addr = lnx_encode_fi_addr(addr, 0); - - /* Matching format should always be in the encoded form */ - match_attr.lm_addr = (addr == FI_ADDR_UNSPEC) || - !(lep->le_ep.caps & FI_DIRECTED_RECV) ? FI_ADDR_UNSPEC : - encoded_addr; - match_attr.lm_ignore = ignore; - match_attr.lm_tag = tag; - - if (flags & FI_PEEK) - return lnx_peek(lep, &match_attr, context, flags); - - if (flags & FI_DISCARD) { - rx_entry = (struct lnx_rx_entry *) - (((struct fi_context *)context)->internal[0]); - return lnx_discard(lep, rx_entry, context); - } - - if (flags & FI_CLAIM) { - rx_entry = (struct lnx_rx_entry *) - (((struct fi_context *)context)->internal[0]); - } else { - rx_entry = lnx_remove_first_match(&lnx_srq->lps_trecv.lqp_unexq, - &match_attr); - if (!rx_entry) { - FI_DBG(&lnx_prov, FI_LOG_CORE, - "addr=%" PRIx64 " tag=%" PRIx64 " ignore=%" PRIx64 " buf=%p len=%zu not found\n", - addr, tag, ignore, iov->iov_base, iov->iov_len); - - goto nomatch; - } - } - - FI_DBG(&lnx_prov, FI_LOG_CORE, - "addr=%" PRIx64 " tag=%" PRIx64 " ignore=%" PRIx64 " buf=%p len=%zu found\n", - addr, tag, ignore, iov->iov_base, iov->iov_len); - - /* match is found in the unexpected queue. call into the core - * provider to complete this message - */ - cep = rx_entry->rx_cep; - sub_addr = lnx_get_core_addr(cep, rx_entry->rx_entry.addr); - lnx_init_rx_entry(rx_entry, iov, desc, count, sub_addr, - tag, ignore, context, - rx_entry->rx_entry.flags | flags); - rx_entry->rx_entry.msg_size = MIN(ofi_total_iov_len(iov, count), - rx_entry->rx_entry.msg_size); - - if (desc) { - rc = lnx_mr_regattr_core(cep->cep_domain, desc, - rx_entry->rx_entry.desc); - if (rc) - return rc; - } - - lnx_set_send_pair[lep->le_mr](lep, cep, match_attr.lm_addr); - - if (tagged) - rc = cep->cep_srx.peer_ops->start_tag(&rx_entry->rx_entry); - else - rc = cep->cep_srx.peer_ops->start_msg(&rx_entry->rx_entry); - - if (rc == -FI_EINPROGRESS) { - /* this is telling me that more messages can match the same - * rx_entry. So keep it on the queue - */ - FI_DBG(&lnx_prov, FI_LOG_CORE, - "addr = %" PRIx64 " tag = %" PRIx64 " ignore = %" PRIx64 " start_tag() in progress\n", - addr, tag, ignore); - - goto insert_recvq; - } else if (rc) { - FI_WARN(&lnx_prov, FI_LOG_CORE, "start tag failed with %d\n", rc); - } - - FI_DBG(&lnx_prov, FI_LOG_CORE, - "addr = %" PRIx64 " tag = %" PRIx64 " ignore = %" PRIx64 " start_tag() success\n", - addr, tag, ignore); - - return 0; - -nomatch: - /* nothing on the unexpected queue, then allocate one and put it on - * the receive queue - */ - rx_entry = get_rx_entry(NULL, iov, (desc) ? &desc : NULL, count, - match_attr.lm_addr, tag, ignore, context, - flags); - if (!rx_entry) { - rc = -FI_ENOMEM; - goto out; - } - -insert_recvq: - lnx_insert_rx_entry(&lnx_srq->lps_trecv.lqp_recvq, rx_entry); - -out: - return rc; -} - -ssize_t lnx_trecv(struct fid_ep *ep, void *buf, size_t len, void *desc, - fi_addr_t src_addr, uint64_t tag, uint64_t ignore, void *context) -{ - struct lnx_ep *lep; - const struct iovec iov = {.iov_base = buf, .iov_len = len}; - - lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); - if (!lep) - return -FI_ENOSYS; - - return lnx_process_recv(lep, &iov, desc, src_addr, 1, tag, ignore, - context, lnx_ep_rx_flags(lep), true); -} - -ssize_t lnx_trecvv(struct fid_ep *ep, const struct iovec *iov, void **desc, - size_t count, fi_addr_t src_addr, uint64_t tag, uint64_t ignore, - void *context) -{ - void *mr_desc; - struct lnx_ep *lep; - - lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); - if (!lep) - return -FI_ENOSYS; - - - if (count == 0) { - mr_desc = NULL; - } else if (iov && count >= 1 && - count <= lep->le_domain->ld_iov_limit) { - mr_desc = desc ? desc[0] : NULL; - } else { - FI_WARN(&lnx_prov, FI_LOG_CORE, "Invalid IOV\n"); - return -FI_EINVAL; - } - - return lnx_process_recv(lep, iov, mr_desc, src_addr, count, tag, ignore, - context, lnx_ep_rx_flags(lep), true); -} - -ssize_t lnx_trecvmsg(struct fid_ep *ep, const struct fi_msg_tagged *msg, - uint64_t flags) -{ - void *mr_desc; - struct lnx_ep *lep; - - lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); - if (!lep) - return -FI_ENOSYS; - - if (msg->iov_count == 0) { - mr_desc = NULL; - } else if (msg->msg_iov && msg->iov_count >= 1 && - msg->iov_count <= lep->le_domain->ld_iov_limit) { - mr_desc = msg->desc ? msg->desc[0] : NULL; - } else { - FI_WARN(&lnx_prov, FI_LOG_CORE, "Invalid IOV\n"); - return -FI_EINVAL; - } - - return lnx_process_recv(lep, msg->msg_iov, mr_desc, msg->addr, msg->iov_count, - msg->tag, msg->ignore, msg->context, - flags | lep->le_ep.rx_msg_flags, true); -} - -ssize_t lnx_tsend(struct fid_ep *ep, const void *buf, size_t len, void *desc, - fi_addr_t dest_addr, uint64_t tag, void *context) -{ - int rc; - struct lnx_ep *lep; - void *core_desc = NULL; - struct lnx_core_ep *cep; - fi_addr_t core_addr; - - lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); - if (!lep) - return -FI_ENOSYS; - - rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, &core_addr); - if (rc) - return rc; - - FI_DBG(&lnx_prov, FI_LOG_CORE, - "sending to %" PRIx64 " tag %" PRIx64 " buf %p len %zu\n", - core_addr, tag, buf, len); - - if (desc) { - rc = lnx_mr_regattr_core(cep->cep_domain, desc, &core_desc); - if (rc) - return rc; - } - - rc = fi_tsend(cep->cep_ep, buf, len, core_desc, core_addr, tag, context); - - if (!rc) - cep->cep_t_stats.st_num_tsend++; - - return rc; -} - -ssize_t lnx_tsendv(struct fid_ep *ep, const struct iovec *iov, void **desc, - size_t count, fi_addr_t dest_addr, uint64_t tag, void *context) -{ - int rc; - struct lnx_ep *lep; - void *core_desc = NULL; - struct lnx_core_ep *cep; - fi_addr_t core_addr; - - lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); - if (!lep) - return -FI_ENOSYS; - - rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, &core_addr); - if (rc) - return rc; - - FI_DBG(&lnx_prov, FI_LOG_CORE, - "sending to %" PRIx64 " tag %" PRIx64 " buf %p len %zu\n", - core_addr, tag, iov->iov_base, iov->iov_len); - - if (desc && *desc) { - rc = lnx_mr_regattr_core(cep->cep_domain, *desc, &core_desc); - if (rc) - return rc; - } - - rc = fi_tsendv(cep->cep_ep, iov, (core_desc) ? &core_desc : NULL, - count, core_addr, tag, context); - - if (!rc) - cep->cep_t_stats.st_num_tsendv++; - - return rc; -} - -ssize_t lnx_tsendmsg(struct fid_ep *ep, const struct fi_msg_tagged *msg, - uint64_t flags) -{ - int rc; - struct lnx_ep *lep; - void *core_desc = NULL; - struct lnx_core_ep *cep; - struct fi_msg_tagged core_msg; - - memcpy(&core_msg, msg, sizeof(*msg)); - - lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); - if (!lep) - return -FI_ENOSYS; - - rc = lnx_select_send_endpoints[lep->le_mr](lep, core_msg.addr, &cep, &core_msg.addr); - if (rc) - return rc; - - FI_DBG(&lnx_prov, FI_LOG_CORE, - "sending to %" PRIx64 " tag %" PRIx64 "\n", - core_msg.addr, core_msg.tag); - - if (core_msg.desc && *core_msg.desc) { - rc = lnx_mr_regattr_core(cep->cep_domain, *core_msg.desc, &core_desc); - if (rc) - return rc; - core_msg.desc = &core_desc; - } - - rc = fi_tsendmsg(cep->cep_ep, &core_msg, flags); - - if (!rc) - cep->cep_t_stats.st_num_tsendmsg++; - - return rc; -} - -ssize_t lnx_tinject(struct fid_ep *ep, const void *buf, size_t len, - fi_addr_t dest_addr, uint64_t tag) -{ - int rc; - struct lnx_ep *lep; - struct lnx_core_ep *cep; - fi_addr_t core_addr; - - lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); - if (!lep) - return -FI_ENOSYS; - - rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, &core_addr); - if (rc) - return rc; - - FI_DBG(&lnx_prov, FI_LOG_CORE, - "sending to %" PRIx64 " tag %" PRIx64 " buf %p len %zu\n", - core_addr, tag, buf, len); - - rc = fi_tinject(cep->cep_ep, buf, len, core_addr, tag); - - if (!rc) - cep->cep_t_stats.st_num_tinject++; - - return rc; -} - -ssize_t lnx_tsenddata(struct fid_ep *ep, const void *buf, size_t len, void *desc, - uint64_t data, fi_addr_t dest_addr, uint64_t tag, void *context) -{ - int rc; - struct lnx_ep *lep; - struct lnx_core_ep *cep; - fi_addr_t core_addr; - void *core_desc = desc; - - lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); - if (!lep) - return -FI_ENOSYS; - - rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, &core_addr); - if (rc) - return rc; - - FI_DBG(&lnx_prov, FI_LOG_CORE, - "sending to %" PRIx64 " tag %" PRIx64 " buf %p len %zu\n", - core_addr, tag, buf, len); - - if (desc) { - rc = lnx_mr_regattr_core(cep->cep_domain, desc, &core_desc); - if (rc) - return rc; - } - - rc = fi_tsenddata(cep->cep_ep, buf, len, core_desc, - data, core_addr, tag, context); - - if (!rc) - cep->cep_t_stats.st_num_tsenddata++; - - return rc; -} - -ssize_t lnx_tinjectdata(struct fid_ep *ep, const void *buf, size_t len, - uint64_t data, fi_addr_t dest_addr, uint64_t tag) -{ - int rc; - struct lnx_ep *lep; - struct lnx_core_ep *cep; - fi_addr_t core_addr; - - lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); - if (!lep) - return -FI_ENOSYS; - - rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, &core_addr); - if (rc) - return rc; - - FI_DBG(&lnx_prov, FI_LOG_CORE, - "sending to %" PRIx64 " tag %" PRIx64 " buf %p len %zu\n", - core_addr, tag, buf, len); - - rc = fi_tinjectdata(cep->cep_ep, buf, len, data, core_addr, tag); - - if (!rc) - cep->cep_t_stats.st_num_tinjectdata++; - - return rc; -} - -struct fi_ops_tagged lnx_tagged_ops = { - .size = sizeof(struct fi_ops_tagged), - .recv = lnx_trecv, - .recvv = lnx_trecvv, - .recvmsg = lnx_trecvmsg, - .send = lnx_tsend, - .sendv = lnx_tsendv, - .sendmsg = lnx_tsendmsg, - .inject = lnx_tinject, - .senddata = lnx_tsenddata, - .injectdata = lnx_tinjectdata, -}; - -struct fi_ops_msg lnx_msg_ops = { - .size = sizeof(struct fi_ops_msg), - .recv = fi_no_msg_recv, - .recvv = fi_no_msg_recvv, - .recvmsg = fi_no_msg_recvmsg, - .send = fi_no_msg_send, - .sendv = fi_no_msg_sendv, - .sendmsg = fi_no_msg_sendmsg, - .inject = fi_no_msg_inject, - .senddata = fi_no_msg_senddata, - .injectdata = fi_no_msg_injectdata, -}; - -struct fi_ops_rma lnx_rma_ops = { - .size = sizeof(struct fi_ops_rma), - .read = fi_no_rma_read, - .readv = fi_no_rma_readv, - .readmsg = fi_no_rma_readmsg, - .write = fi_no_rma_write, - .writev = fi_no_rma_writev, - .writemsg = fi_no_rma_writemsg, - .inject = fi_no_rma_inject, - .writedata = fi_no_rma_writedata, - .injectdata = fi_no_rma_injectdata, -}; - -struct fi_ops_atomic lnx_atomic_ops = { - .size = sizeof(struct fi_ops_atomic), - .write = fi_no_atomic_write, - .writev = fi_no_atomic_writev, - .writemsg = fi_no_atomic_writemsg, - .inject = fi_no_atomic_inject, - .readwrite = fi_no_atomic_readwrite, - .readwritev = fi_no_atomic_readwritev, - .readwritemsg = fi_no_atomic_readwritemsg, - .compwrite = fi_no_atomic_compwrite, - .compwritev = fi_no_atomic_compwritev, - .compwritemsg = fi_no_atomic_compwritemsg, - .writevalid = fi_no_atomic_writevalid, - .readwritevalid = fi_no_atomic_readwritevalid, - .compwritevalid = fi_no_atomic_compwritevalid, -}; - - diff --git a/prov/lnx/src/lnx_srx.c b/prov/lnx/src/lnx_srx.c new file mode 100644 index 00000000000..0b3da0053c6 --- /dev/null +++ b/prov/lnx/src/lnx_srx.c @@ -0,0 +1,486 @@ +/* + * Copyright (c) 2022 ORNL. All rights reserved. + * Copyright (c) Intel Corporation. All rights reserved. + * + * This software is available to you under a choice of one of two + * licenses. You may choose to be licensed under the terms of the GNU + * General Public License (GPL) Version 2, available from the file + * COPYING in the main directory of this source tree, or the + * BSD license below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include "lnx.h" +#include "ofi_iov.h" + +static int lnx_get_msg(struct fid_peer_srx *srx, + struct fi_peer_match_attr *match, + struct fi_peer_rx_entry **entry) +{ + return -FI_ENOSYS; +} + +static int lnx_queue_msg(struct fi_peer_rx_entry *entry) +{ + return -FI_ENOSYS; +} + +static void lnx_free_entry(struct fi_peer_rx_entry *entry) +{ + struct lnx_rx_entry *rx_entry; + ofi_spin_t *bplock; + + rx_entry = container_of(entry, struct lnx_rx_entry, rx_entry); + + if (rx_entry->rx_global) + bplock = &global_bplock; + else + bplock = &rx_entry->rx_lep->le_bplock; + + ofi_spin_lock(bplock); + ofi_buf_free(rx_entry); + ofi_spin_unlock(bplock); +} + +static void lnx_init_rx_entry(struct lnx_rx_entry *entry, + const struct iovec *iov, + void **desc, size_t count, fi_addr_t addr, + uint64_t tag, uint64_t ignore, void *context, + uint64_t flags) +{ + if (iov) + memcpy(&entry->rx_iov, iov, sizeof(*iov) * count); + if (desc) + memcpy(entry->rx_desc, desc, sizeof(*desc) * count); + + entry->rx_entry.iov = entry->rx_iov; + entry->rx_entry.desc = entry->rx_desc; + entry->rx_entry.count = count; + entry->rx_entry.addr = addr; + entry->rx_entry.context = context; + entry->rx_entry.tag = tag; + entry->rx_entry.flags = flags | FI_TAGGED | FI_RECV; + entry->rx_ignore = ignore; +} + +static struct lnx_rx_entry *get_rx_entry(struct lnx_ep *lep, + const struct iovec *iov, void **desc, + size_t count, fi_addr_t addr, + uint64_t tag, uint64_t ignore, + void *context, uint64_t flags) +{ + struct lnx_rx_entry *rx_entry = NULL; + ofi_spin_t *bplock; + struct ofi_bufpool *bp; + + /* if lp is NULL, then we don't know where the message is going to + * come from, so allocate the rx_entry from a global pool + */ + if (!lep) { + bp = global_recv_bp; + bplock = &global_bplock; + } else { + bp = lep->le_recv_bp; + bplock = &lep->le_bplock; + } + + ofi_spin_lock(bplock); + rx_entry = (struct lnx_rx_entry *)ofi_buf_alloc(bp); + ofi_spin_unlock(bplock); + if (rx_entry) { + memset(rx_entry, 0, sizeof(*rx_entry)); + if (!lep) + rx_entry->rx_global = true; + rx_entry->rx_lep = lep; + lnx_init_rx_entry(rx_entry, iov, desc, count, addr, tag, + ignore, context, flags); + } + + return rx_entry; +} + +static inline struct lnx_rx_entry *lnx_find_first_match( + struct lnx_queue *q, struct lnx_match_attr *match) +{ + struct lnx_rx_entry *rx_entry; + + rx_entry = (struct lnx_rx_entry *) dlist_find_first_match( + &q->lq_queue, q->lq_match_func, match); + + return rx_entry; +} + +static inline void lnx_update_queue_stats(struct lnx_queue *q, bool dq) +{ + if (dq) + q->lq_size--; + else + q->lq_size++; + + if (q->lq_size > q->lq_max) + q->lq_max = q->lq_size; + + q->lq_rolling_sum += q->lq_size; + q->lq_count++; + q->lq_rolling_avg = q->lq_rolling_sum / q->lq_count; +} + +static inline struct lnx_rx_entry *lnx_remove_first_match( + struct lnx_queue *q, struct lnx_match_attr *match) +{ + struct lnx_rx_entry *rx_entry; + + rx_entry = (struct lnx_rx_entry *) dlist_remove_first_match( + &q->lq_queue, q->lq_match_func, match); + if (rx_entry) + lnx_update_queue_stats(q, true); + + return rx_entry; +} + +static inline void lnx_insert_rx_entry(struct lnx_queue *q, + struct lnx_rx_entry *entry) +{ + dlist_insert_tail(&entry->entry, &q->lq_queue); + lnx_update_queue_stats(q, false); +} + +static int lnx_queue_tag(struct fi_peer_rx_entry *entry) +{ + struct lnx_rx_entry *rx_entry; + struct lnx_peer_srq *lnx_srq = + (struct lnx_peer_srq*)entry->owner_context; + + rx_entry = container_of(entry, struct lnx_rx_entry, rx_entry); + FI_DBG(&lnx_prov, FI_LOG_CORE, + "addr = %" PRIx64 " tag = %" PRIx64 " ignore = 0 found\n", + entry->addr, entry->tag); + + lnx_insert_rx_entry(&lnx_srq->lps_trecv.lqp_unexq, rx_entry); + + return 0; +} + +static int lnx_get_tag(struct fid_peer_srx *srx, + struct fi_peer_match_attr *match, + struct fi_peer_rx_entry **entry) +{ + struct lnx_match_attr match_attr = {0}; + struct lnx_peer_srq *lnx_srq; + struct lnx_core_ep *cep; + struct lnx_ep *lep; + struct lnx_rx_entry *rx_entry; + fi_addr_t addr = match->addr; + uint64_t tag = match->tag; + int rc = 0; + + cep = srx->ep_fid.fid.context; + lep = cep->cep_parent; + lnx_srq = &lep->le_srq; + + match_attr.lm_addr = addr; + match_attr.lm_tag = tag; + + rx_entry = lnx_remove_first_match(&lnx_srq->lps_trecv.lqp_recvq, + &match_attr); + if (rx_entry) { + FI_DBG(&lnx_prov, FI_LOG_CORE, + "addr = %" PRIx64 " tag = %" PRIx64 + " ignore = 0 found\n", match_attr.lm_addr, tag); + + goto assign; + } + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "addr = %" PRIx64 " tag = %" PRIx64 " ignore = 0 not found\n", + match_attr.lm_addr, tag); + + rx_entry = get_rx_entry(lep, NULL, NULL, 0, addr, tag, 0, NULL, + FI_TAGGED | FI_RECV); + if (!rx_entry) { + rc = -FI_ENOMEM; + goto out; + } + + rx_entry->rx_entry.owner_context = lnx_srq; + rx_entry->rx_cep = cep; + + rc = -FI_ENOENT; + + cep->cep_t_stats.st_num_unexp_msgs++; + + goto finalize; + +assign: + lnx_set_send_pair[lep->le_mr](lep, cep, addr); + + cep->cep_t_stats.st_num_posted_recvs++; + + rx_entry->rx_entry.addr = lnx_get_core_addr(cep, addr); + if (rx_entry->rx_entry.desc && *rx_entry->rx_entry.desc) { + rc = lnx_mr_regattr_core(cep->cep_domain, + *rx_entry->rx_entry.desc, + rx_entry->rx_entry.desc); + if (rc) + return rc; + } +finalize: + rx_entry->rx_entry.msg_size = match->msg_size; + *entry = &rx_entry->rx_entry; + +out: + return rc; +} + +static void lnx_update_msg_entries( + struct lnx_qpair *qp, + fi_addr_t (*get_addr)(struct fi_peer_rx_entry *)) +{ + struct lnx_queue *q = &qp->lqp_unexq; + struct lnx_rx_entry *rx_entry; + struct dlist_entry *item; + + dlist_foreach(&q->lq_queue, item) { + rx_entry = (struct lnx_rx_entry *) item; + if (rx_entry->rx_entry.addr == FI_ADDR_UNSPEC) + rx_entry->rx_entry.addr = get_addr(&rx_entry->rx_entry); + } +} + +static void lnx_foreach_unspec_addr( + struct fid_peer_srx *srx, + fi_addr_t (*get_addr)(struct fi_peer_rx_entry *)) +{ + struct lnx_ep *lep; + struct lnx_core_ep *cep; + + cep = (struct lnx_core_ep *) srx->ep_fid.fid.context; + lep = cep->cep_parent; + + lnx_update_msg_entries(&lep->le_srq.lps_trecv, get_addr); + lnx_update_msg_entries(&lep->le_srq.lps_recv, get_addr); +} + +struct fi_ops_srx_owner lnx_srx_ops = { + .size = sizeof(struct fi_ops_srx_owner), + .get_msg = lnx_get_msg, + .get_tag = lnx_get_tag, + .queue_msg = lnx_queue_msg, + .queue_tag = lnx_queue_tag, + .free_entry = lnx_free_entry, + .foreach_unspec_addr = lnx_foreach_unspec_addr, +}; + +static int lnx_discard(struct lnx_ep *lep, struct lnx_rx_entry *rx_entry, + void *context) +{ + struct lnx_core_ep *cep = rx_entry->rx_cep; + int rc; + + rc = cep->cep_srx.peer_ops->discard_tag(&rx_entry->rx_entry); + if (rc) { + FI_WARN(&lnx_prov, FI_LOG_CORE, + "Error discarding message from %s\n", + cep->cep_domain->cd_info->fabric_attr->name); + } + + rc = ofi_cq_write(lep->le_ep.rx_cq, context, rx_entry->rx_entry.flags, + rx_entry->rx_entry.msg_size, NULL, + rx_entry->rx_entry.cq_data, rx_entry->rx_entry.tag); + + dlist_remove(&rx_entry->entry); + lnx_free_entry(&rx_entry->rx_entry); + + return rc; +} + +static int lnx_peek(struct lnx_ep *lep, struct lnx_match_attr *match_attr, + void *context, uint64_t flags) +{ + int rc; + struct lnx_rx_entry *rx_entry; + struct lnx_peer_srq *lnx_srq = &lep->le_srq; + + rx_entry = lnx_find_first_match(&lnx_srq->lps_trecv.lqp_unexq, + match_attr); + if (!rx_entry) { + FI_DBG(&lnx_prov, FI_LOG_CORE, + "PEEK addr=%" PRIx64 " tag=%" PRIx64 " ignore=%" + PRIx64 "\n", match_attr->lm_addr, match_attr->lm_tag, + match_attr->lm_ignore); + return ofi_cq_write_error_peek(lep->le_ep.rx_cq, + match_attr->lm_tag, context); + } + + rc = ofi_cq_write(lep->le_ep.rx_cq, context, + rx_entry->rx_entry.flags, + rx_entry->rx_entry.msg_size, NULL, + rx_entry->rx_entry.cq_data, + rx_entry->rx_entry.tag); + + if (flags & FI_DISCARD) { + rc = rx_entry->rx_cep->cep_srx.peer_ops->discard_tag( + &rx_entry->rx_entry); + dlist_remove(&rx_entry->entry); + lnx_free_entry(&rx_entry->rx_entry); + goto out; + } + + if (flags & FI_CLAIM) { + dlist_remove(&rx_entry->entry); + ((struct fi_context *)context)->internal[0] = rx_entry; + } + +out: + return rc; +} + +/* + * if lp is NULL, then we're attempting to receive from any peer so + * matching the tag is the only thing that matters. + * + * if lp != NULL, then we're attempting to receive from a particular + * peer. This peer can have multiple endpoints serviced by different core + * providers. + * + * Therefore when we check the unexpected queue, we need to check + * if we received any messages from any of the peer's addresses. If we + * find one, then we kick the core provider associated with that + * address to receive the message. + * + * If nothing is found on the unexpected messages, then add a receive + * request on the SRQ; happens in the lnx_process_recv() + */ +int lnx_process_recv(struct lnx_ep *lep, const struct iovec *iov, void *desc, + fi_addr_t addr, size_t count, uint64_t tag, + uint64_t ignore, void *context, uint64_t flags, + bool tagged) +{ + struct lnx_peer_srq *lnx_srq = &lep->le_srq; + struct lnx_rx_entry *rx_entry; + struct lnx_match_attr match_attr; + struct lnx_core_ep *cep; + int rc = 0; + fi_addr_t sub_addr, encoded_addr = lnx_encode_fi_addr(addr, 0); + + /* Matching format should always be in the encoded form */ + match_attr.lm_addr = (addr == FI_ADDR_UNSPEC) || + !(lep->le_ep.caps & FI_DIRECTED_RECV) ? + FI_ADDR_UNSPEC : encoded_addr; + match_attr.lm_ignore = ignore; + match_attr.lm_tag = tag; + + if (flags & FI_PEEK) + return lnx_peek(lep, &match_attr, context, flags); + + if (flags & FI_DISCARD) { + rx_entry = (struct lnx_rx_entry *) + (((struct fi_context *)context)->internal[0]); + return lnx_discard(lep, rx_entry, context); + } + + if (flags & FI_CLAIM) { + rx_entry = (struct lnx_rx_entry *) + (((struct fi_context *)context)->internal[0]); + } else { + rx_entry = lnx_remove_first_match(&lnx_srq->lps_trecv.lqp_unexq, + &match_attr); + if (!rx_entry) { + FI_DBG(&lnx_prov, FI_LOG_CORE, + "addr=%" PRIx64 " tag=%" PRIx64 " ignore=%" + PRIx64 " buf=%p len=%zu not found\n", + addr, tag, ignore, iov->iov_base, iov->iov_len); + goto nomatch; + } + } + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "addr=%" PRIx64 " tag=%" PRIx64 " ignore=%" PRIx64 + " buf=%p len=%zu found\n", + addr, tag, ignore, iov->iov_base, iov->iov_len); + + /* match is found in the unexpected queue. call into the core + * provider to complete this message + */ + cep = rx_entry->rx_cep; + sub_addr = lnx_get_core_addr(cep, rx_entry->rx_entry.addr); + lnx_init_rx_entry(rx_entry, iov, desc, count, sub_addr, tag, ignore, + context, rx_entry->rx_entry.flags | flags); + rx_entry->rx_entry.msg_size = MIN(ofi_total_iov_len(iov, count), + rx_entry->rx_entry.msg_size); + + if (desc) { + rc = lnx_mr_regattr_core(cep->cep_domain, desc, + rx_entry->rx_entry.desc); + if (rc) + return rc; + } + + lnx_set_send_pair[lep->le_mr](lep, cep, match_attr.lm_addr); + + if (tagged) + rc = cep->cep_srx.peer_ops->start_tag(&rx_entry->rx_entry); + else + rc = cep->cep_srx.peer_ops->start_msg(&rx_entry->rx_entry); + + if (rc == -FI_EINPROGRESS) { + /* this is telling me that more messages can match the same + * rx_entry. So keep it on the queue + */ + FI_DBG(&lnx_prov, FI_LOG_CORE, + "addr = %" PRIx64 " tag = %" PRIx64 " ignore = %" PRIx64 + " start_tag() in progress\n", + addr, tag, ignore); + + goto insert_recvq; + } + if (rc) + FI_WARN(&lnx_prov, FI_LOG_CORE, + "start tag failed with %d\n", rc); + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "addr = %" PRIx64 " tag = %" PRIx64 " ignore = %" PRIx64 + " start_tag() success\n", + addr, tag, ignore); + + return 0; + +nomatch: + /* nothing on the unexpected queue, then allocate one and put it on + * the receive queue + */ + rx_entry = get_rx_entry(NULL, iov, (desc) ? &desc : NULL, count, + match_attr.lm_addr, tag, ignore, context, + flags); + if (!rx_entry) { + rc = -FI_ENOMEM; + goto out; + } + +insert_recvq: + lnx_insert_rx_entry(&lnx_srq->lps_trecv.lqp_recvq, rx_entry); + +out: + return rc; +}