Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fabtests/test_configs/lnx/lnx.exclude
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ sighandler_test
# Uses FI_RMA
multi_mr
rma
mr_test

# Tests that are broken
recv_cancel
Expand Down
5 changes: 3 additions & 2 deletions prov/lnx/Makefile.include
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ _lnx_files = \
prov/lnx/src/lnx_domain.c \
prov/lnx/src/lnx_ep.c \
prov/lnx/src/lnx_init.c \
prov/lnx/src/lnx_ops.c \
prov/lnx/src/lnx_srx.c \
prov/lnx/src/lnx_mr.c \
prov/lnx/src/lnx_av.c
prov/lnx/src/lnx_av.c \
prov/lnx/src/lnx_msg.c

_lnx_headers = \
prov/lnx/include/lnx.h
Expand Down
212 changes: 44 additions & 168 deletions prov/lnx/include/lnx.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright (c) 2022 ORNL. All rights reserved.
* Copyright (c) Intel Corporation. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
Expand Down Expand Up @@ -33,17 +34,31 @@
#ifndef LNX_H
#define LNX_H

#define LNX_NUM_HISTORY 4096
#include "ofi_lock.h"
#include "ofi_util.h"

#define LNX_MAX_LOCAL_EPS 16
#define LNX_IOV_LIMIT 4
#define LNX_MAX_PRIMARY_ID ((1ULL << 56) - 1)
#define LNX_MAX_SUB_ID ((1ULL << 8) - 1)

#define LNX_PER_MSG_SELECTION_STR "PER_MSG"
#define LNX_PER_PEER_SELECTION_STR "PER_PEER"
#define LNX_SUB_ID_BITS 8
#define LNX_MAX_SUB_ID ((1ULL << LNX_SUB_ID_BITS) - 1)

#define lnx_ep_rx_flags(lnx_ep) ((lnx_ep)->le_ep.rx_op_flags)

enum lnx_multirail_selection {
LNX_MR_SELECTION_PER_MSG = 0,
LNX_MR_SELECTION_PER_PEER,
LNX_MR_SELECTION_MAX,
};

struct lnx_env {
enum lnx_multirail_selection mr;
char *prov_links;
int disable_shm;
int dump_stats;
};

extern struct lnx_env lnx_env;

struct lnx_match_attr {
fi_addr_t lm_addr;
uint64_t lm_tag;
Expand Down Expand Up @@ -167,12 +182,6 @@ struct lnx_domain {
int ld_ep_idx;
};

enum lnx_multirail_selection {
LNX_MR_SELECTION_PER_MSG = 0,
LNX_MR_SELECTION_PER_PEER,
LNX_MR_SELECTION_MAX,
};

struct lnx_ep {
int le_idx;
struct util_ep le_ep;
Expand Down Expand Up @@ -224,76 +233,61 @@ struct lnx_rx_entry {
bool rx_global;
};

OFI_DECLARE_FREESTACK(struct lnx_rx_entry, lnx_recv_fs);

extern struct fi_ops_msg lnx_msg_ops;
extern struct fi_ops_tagged lnx_tagged_ops;
extern struct fi_ops_rma lnx_rma_ops;
extern struct fi_ops_atomic lnx_atomic_ops;
extern struct fi_ops_srx_owner lnx_srx_ops;

extern struct util_prov lnx_util_prov;
extern struct fi_provider lnx_prov;
extern struct ofi_bufpool *global_recv_bp;
extern ofi_spin_t global_bplock;

int lnx_getinfo(uint32_t version, const char *node, const char *service,
uint64_t flags, const struct fi_info *hints,
struct fi_info **info);

int lnx_fabric(struct fi_fabric_attr *attr, struct fid_fabric **fabric,
void *context);
int lnx_setup_fabrics(char *name, struct lnx_fabric *lnx_fab, void *context);

void lnx_fini(void);

int lnx_fabric_close(struct fid *fid);

int lnx_domain_open(struct fid_fabric *fabric, struct fi_info *info,
struct fid_domain **dom, void *context);

int lnx_av_open(struct fid_domain *domain, struct fi_av_attr *attr,
struct fid_av **av, void *context);

struct lnx_peer *
lnx_av_lookup_addr(struct lnx_av *av, fi_addr_t addr);
struct lnx_peer *lnx_av_lookup_addr(struct lnx_av *av, fi_addr_t addr);

int lnx_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
struct fid_cq **cq, void *context);

int lnx_endpoint(struct fid_domain *domain, struct fi_info *info,
struct fid_ep **ep, void *context);

int lnx_cq2ep_bind(struct fid *fid, struct fid *bfid, uint64_t flags);

int lnx_get_msg(struct fid_peer_srx *srx, struct fi_peer_match_attr *match,
struct fi_peer_rx_entry **entry);
int lnx_get_tag(struct fid_peer_srx *srx, struct fi_peer_match_attr *match,
struct fi_peer_rx_entry **entry);
int lnx_queue_msg(struct fi_peer_rx_entry *entry);
int lnx_queue_tag(struct fi_peer_rx_entry *entry);
void lnx_free_entry(struct fi_peer_rx_entry *entry);
void lnx_foreach_unspec_addr(struct fid_peer_srx *srx,
fi_addr_t (*get_addr)(struct fi_peer_rx_entry *));

int lnx_mr_regattr(struct fid *fid, const struct fi_mr_attr *attr,
uint64_t flags, struct fid_mr **mr_fid);
int lnx_mr_regattr_core(struct lnx_core_domain *cd, void *desc,
void **core_desc);

static inline fi_addr_t
lnx_encode_fi_addr(uint64_t primary_id, uint8_t sub_id)
int lnx_process_recv(struct lnx_ep *lep, const struct iovec *iov, void *desc,
fi_addr_t addr, size_t count, uint64_t tag,
uint64_t ignore, void *context, uint64_t flags,
bool tagged);

static inline fi_addr_t lnx_encode_fi_addr(uint64_t primary_id, uint8_t sub_id)
{
return (primary_id << 8) | sub_id;
}

static inline fi_addr_t lnx_decode_primary_id(uint64_t fi_addr)
{
return (fi_addr == FI_ADDR_UNSPEC) ? fi_addr : fi_addr >> 8;
return (fi_addr == FI_ADDR_UNSPEC) ? fi_addr :
fi_addr >> LNX_SUB_ID_BITS;
}

static inline uint8_t lnx_decode_sub_id(uint64_t fi_addr)
{
return (fi_addr == FI_ADDR_UNSPEC) ? fi_addr : fi_addr & LNX_MAX_SUB_ID;
}

static inline fi_addr_t
lnx_get_core_addr(struct lnx_core_ep *cep, fi_addr_t addr)
static inline fi_addr_t lnx_get_core_addr(struct lnx_core_ep *cep,
fi_addr_t addr)
{
struct lnx_peer_map *map_addr;
uint8_t idx = lnx_decode_sub_id(addr);
Expand All @@ -306,131 +300,13 @@ lnx_get_core_addr(struct lnx_core_ep *cep, fi_addr_t addr)
return map_addr->map_addrs[idx];
}

/*
* Round robin over the end points and peer addresses per message
* This does not take into consideration send after send requirements.
*/
static inline int
lnx_select_send_endpoints_msg(struct lnx_ep *lep, fi_addr_t lnx_addr,
struct lnx_core_ep **cep_out, fi_addr_t *core_addr)
{
int idx, rr;
struct lnx_peer *lp;
struct lnx_core_ep *cep;
struct lnx_peer_map *map_addr;
struct lnx_peer_ep_map *ep_map;

lp = lnx_av_lookup_addr(lep->le_lav, lnx_addr);
if (!lp)
return -FI_ENOSYS;

/* round robin over the endpoints which can reach this peer */
rr = ofi_atomic_inc32(&lp->lp_ep_rr) - 1;
ep_map = &lp->lp_src_eps[lep->le_idx];
idx = rr % ep_map->pem_num_eps;
cep = ep_map->pem_eps[idx];

map_addr = ofi_bufpool_get_ibuf(cep->cep_cav->cav_map, lp->lp_addr);

/* round robin over available peer addresses */
rr = ofi_atomic_inc32(&map_addr->map_rr) - 1;
idx = rr % (map_addr->map_count);

*core_addr = map_addr->map_addrs[idx];

*cep_out = cep;

return FI_SUCCESS;
}

/*
* Round robin over the end points, assigning each peer a particular local
* endpoint and one of its core addresses to use. This ensures that the
* peer receives messages in the same order they are sent.
*/
static inline int
lnx_select_send_endpoints_peer(struct lnx_ep *lep, fi_addr_t lnx_addr,
struct lnx_core_ep **cep_out, fi_addr_t *core_addr)
{
bool found = false;
int idx, i;
uint32_t rr, seed;
struct lnx_peer *lp;
struct lnx_core_ep *cep;
struct lnx_peer_map *map_addr;
struct lnx_peer_ep_map *ep_map;

lp = lnx_av_lookup_addr(lep->le_lav, lnx_addr);
if (!lp)
return -FI_ENOSYS;

if (lp->lp_locked_cep) {
*core_addr = lp->lp_locked_core_addr;
*cep_out = lp->lp_locked_cep;
return FI_SUCCESS;
}

ep_map = &lp->lp_src_eps[lep->le_idx];

while (!found) {
rr = ofi_atomic_inc32(&lep->le_rr) - 1;
idx = rr % lep->le_domain->ld_num_doms;
cep = &lep->le_core_eps[idx];

for (i = 0; i < ep_map->pem_num_eps; i++) {
if (cep == ep_map->pem_eps[i]) {
found = true;
break;
}
}
}

map_addr = ofi_bufpool_get_ibuf(cep->cep_cav->cav_map, lp->lp_addr);

/* randomize the address selection from the list of reachable
* peer addresses. This will allow different processes to send to
* different peer addresses, instead of using the first address of
* a peer. This is not round robin over the peer addresses, but
* in the PER_PEER case, there is no effective way to enforce
* round robin across independent peers. In this method it at
* least allows spreading traffic across peer addresses. */
seed = (uint32_t)ofi_gettime_ns();
rr = ofi_xorshift_random(seed);
idx = rr % (map_addr->map_count);

*core_addr = lp->lp_locked_core_addr = map_addr->map_addrs[idx];
*cep_out = lp->lp_locked_cep = cep;

return FI_SUCCESS;
}

static inline void
lnx_set_send_pair_noop(struct lnx_ep *lep, struct lnx_core_ep *cep, fi_addr_t addr)
{
}

/* if you haven't sent to a particular peer before then lock the
* address you're going to send on. If this is the first message
* you receive from peer, then you want to respond on the same
* address and core endpoint that peer used. This will ensure
* symmetry.
*/
static inline void
lnx_set_send_pair_peer(struct lnx_ep *lep, struct lnx_core_ep *cep, fi_addr_t addr)
{
fi_addr_t core_addr, prim_addr;
struct lnx_peer *lp;
typedef int (*lnx_select_tx_ep)(
struct lnx_ep *lep, fi_addr_t lnx_addr,
struct lnx_core_ep **cep_out, fi_addr_t *core_addr);
extern lnx_select_tx_ep lnx_select_send_endpoints[LNX_MR_SELECTION_MAX];

if (addr == FI_ADDR_UNSPEC)
return;

prim_addr = lnx_decode_primary_id(addr);
lp = lnx_av_lookup_addr(lep->le_lav, prim_addr);
if (!lp->lp_locked_cep) {
core_addr = lnx_get_core_addr(cep, addr);
lp->lp_locked_core_addr = core_addr;
lp->lp_locked_cep = cep;
}
}
typedef void (*lnx_set_tx_pair)(
struct lnx_ep *lep, struct lnx_core_ep *cep, fi_addr_t addr);
extern lnx_set_tx_pair lnx_set_send_pair[LNX_MR_SELECTION_MAX];

#endif /* LNX_H */
Loading
Loading