diff --git a/fabtests/test_configs/lnx/all.test b/fabtests/test_configs/lnx/all.test index 29bc1faf6b6..2ade5384316 100644 --- a/fabtests/test_configs/lnx/all.test +++ b/fabtests/test_configs/lnx/all.test @@ -1,7 +1,6 @@ { prov_name: lnx, test_type: [ - FT_TEST_LATENCY, FT_TEST_BANDWIDTH, ], class_function: [ @@ -16,6 +15,7 @@ FI_EP_RDM, ], test_class: [ + FT_CAP_MSG, FT_CAP_TAGGED, ], comp_type: [ @@ -25,6 +25,7 @@ { prov_name: lnx, test_type: [ + FT_TEST_LATENCY, FT_TEST_UNIT ], class_function: [ @@ -39,10 +40,79 @@ FI_EP_RDM, ], test_class: [ + FT_CAP_MSG, FT_CAP_TAGGED, ], comp_type: [ FT_COMP_QUEUE, ], test_flags: FT_FLAG_QUICKTEST +}, +{ + prov_name: lnx, + test_type: [ + FT_TEST_BANDWIDTH, + ], + class_function: [ + FT_FUNC_WRITE, + FT_FUNC_WRITEV, + FT_FUNC_WRITEMSG, + FT_FUNC_INJECT_WRITE, + FT_FUNC_WRITEDATA, + FT_FUNC_INJECT_WRITEDATA, + FT_FUNC_READ, + FT_FUNC_READV, + FT_FUNC_READMSG, + ], + ep_type: [ + FI_EP_RDM, + ], + test_class: [ + FT_CAP_RMA, + ], + mr_mode: [ + FI_MR_LOCAL, + FI_MR_VIRT_ADDR, + FI_MR_ALLOCATED, + FI_MR_PROV_KEY, + FI_MR_RAW, + ], + comp_type: [ + FT_COMP_QUEUE, + ], +}, +{ + prov_name: lnx, + test_type: [ + FT_TEST_LATENCY, + FT_TEST_UNIT, + ], + class_function: [ + FT_FUNC_WRITE, + FT_FUNC_WRITEV, + FT_FUNC_WRITEMSG, + FT_FUNC_INJECT_WRITE, + FT_FUNC_WRITEDATA, + FT_FUNC_INJECT_WRITEDATA, + FT_FUNC_READ, + FT_FUNC_READV, + FT_FUNC_READMSG, + ], + ep_type: [ + FI_EP_RDM, + ], + test_class: [ + FT_CAP_RMA, + ], + mr_mode: [ + FI_MR_LOCAL, + FI_MR_VIRT_ADDR, + FI_MR_ALLOCATED, + FI_MR_PROV_KEY, + FI_MR_RAW, + ], + comp_type: [ + FT_COMP_QUEUE, + ], + test_flags: FT_FLAG_QUICKTEST } \ No newline at end of file diff --git a/fabtests/test_configs/lnx/lnx.exclude b/fabtests/test_configs/lnx/lnx.exclude index e49287a21b0..ee0c78be9c8 100644 --- a/fabtests/test_configs/lnx/lnx.exclude +++ b/fabtests/test_configs/lnx/lnx.exclude @@ -11,28 +11,18 @@ cm_data atomic scalable_ep shared_ctx - -# Uses SOCKADDR -av_test - -# Uses FI_MSG -multinode -rdm_pingpong multi_recv cntr_pingpong -multi_client -flood multi_ep -cq_data -av_xfer shared_av -fi_rdm\b -sighandler_test - -# Uses FI_RMA +-k +rma_event +rma_trigger multi_mr -rma -mr_test +multinode + +# Uses SOCKADDR +av_test # Tests that are broken recv_cancel diff --git a/fabtests/ubertest/test_ctrl.c b/fabtests/ubertest/test_ctrl.c index 4e80685240d..2ddae88d9c7 100644 --- a/fabtests/ubertest/test_ctrl.c +++ b/fabtests/ubertest/test_ctrl.c @@ -1039,7 +1039,8 @@ static int ft_exchange_mr_addr_key(void) uint64_t addr; int ret; - if (!(test_info.mr_mode & (FI_MR_VIRT_ADDR | FI_MR_PROV_KEY))) + if (!(test_info.mr_mode & + (FI_MR_VIRT_ADDR | FI_MR_PROV_KEY | FI_MR_RAW))) return 0; if (test_info.mr_mode & FI_MR_VIRT_ADDR) diff --git a/prov/lnx/Makefile.include b/prov/lnx/Makefile.include index 6b6f1ac9816..2258ec32574 100644 --- a/prov/lnx/Makefile.include +++ b/prov/lnx/Makefile.include @@ -40,7 +40,8 @@ _lnx_files = \ prov/lnx/src/lnx_srx.c \ prov/lnx/src/lnx_mr.c \ prov/lnx/src/lnx_av.c \ - prov/lnx/src/lnx_msg.c + prov/lnx/src/lnx_msg.c \ + prov/lnx/src/lnx_rma.c _lnx_headers = \ prov/lnx/include/lnx.h diff --git a/prov/lnx/include/lnx.h b/prov/lnx/include/lnx.h index aafd3a8f884..7785ef77cb6 100644 --- a/prov/lnx/include/lnx.h +++ b/prov/lnx/include/lnx.h @@ -96,6 +96,8 @@ struct lnx_core_domain { struct lnx_core_fabric *cd_fabric; struct fi_info *cd_info; uint64_t cd_num_sends; + uint64_t key_seed; + int idx; }; struct lnx_core_av { @@ -114,8 +116,23 @@ struct lnx_t_traffic_stats { uint64_t st_num_tsenddata; uint64_t st_num_tinject; uint64_t st_num_tinjectdata; + uint64_t st_num_send; + uint64_t st_num_sendv; + uint64_t st_num_sendmsg; + uint64_t st_num_senddata; + uint64_t st_num_inject; + uint64_t st_num_injectdata; uint64_t st_num_posted_recvs; uint64_t st_num_unexp_msgs; + uint64_t st_num_read; + uint64_t st_num_readv; + uint64_t st_num_readmsg; + uint64_t st_num_write; + uint64_t st_num_writev; + uint64_t st_num_writemsg; + uint64_t st_num_writedata; + uint64_t st_num_inject_write; + uint64_t st_num_inject_writedata; }; struct lnx_core_ep { @@ -169,8 +186,15 @@ struct lnx_av { struct lnx_mr { struct ofi_mr lm_mr; struct fi_mr_attr lm_attr; - struct fid_mr *lm_core_mr; + size_t key_size; + struct fid_mr **lm_core_mr; struct iovec lm_iov[LNX_IOV_LIMIT]; + void *raw_key; +}; + +struct lnx_mr_key { + uint64_t base_addr; + uint64_t prov_keys[]; }; struct lnx_domain { @@ -187,7 +211,6 @@ struct lnx_ep { struct util_ep le_ep; struct lnx_core_ep *le_core_eps; struct ofi_bufpool *le_recv_bp; - ofi_spin_t le_bplock; struct lnx_domain *le_domain; size_t le_fclass; struct lnx_peer_srq le_srq; @@ -230,7 +253,6 @@ struct lnx_rx_entry { struct lnx_ep *rx_lep; struct lnx_core_ep *rx_cep; uint64_t rx_ignore; - bool rx_global; }; extern struct fi_ops_msg lnx_msg_ops; @@ -241,8 +263,6 @@ extern struct fi_ops_srx_owner lnx_srx_ops; extern struct util_prov lnx_util_prov; extern struct fi_provider lnx_prov; -extern struct ofi_bufpool *global_recv_bp; -extern ofi_spin_t global_bplock; int lnx_setup_fabrics(char *name, struct lnx_fabric *lnx_fab, void *context); @@ -260,12 +280,19 @@ int lnx_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, int lnx_endpoint(struct fid_domain *domain, struct fi_info *info, struct fid_ep **ep, void *context); + +int lnx_mr_reg(struct fid *fid, const void *buf, size_t len, uint64_t access, + uint64_t offset, uint64_t requested_key, uint64_t flags, + struct fid_mr **mr, void *context); +int lnx_mr_regv(struct fid *fid, const struct iovec *iov, size_t count, + uint64_t access, uint64_t offset, uint64_t requested_key, + uint64_t flags, struct fid_mr **mr, void *context); int lnx_mr_regattr(struct fid *fid, const struct fi_mr_attr *attr, uint64_t flags, struct fid_mr **mr_fid); -int lnx_mr_regattr_core(struct lnx_core_domain *cd, void *desc, +int lnx_mr_regattr_core(struct lnx_core_domain *cd, void **desc, size_t count, void **core_desc); -int lnx_process_recv(struct lnx_ep *lep, const struct iovec *iov, void *desc, +int lnx_process_recv(struct lnx_ep *lep, const struct iovec *iov, void **desc, fi_addr_t addr, size_t count, uint64_t tag, uint64_t ignore, void *context, uint64_t flags, bool tagged); diff --git a/prov/lnx/src/lnx_av.c b/prov/lnx/src/lnx_av.c index a0b5704e9fe..dec6f7c96a0 100644 --- a/prov/lnx/src/lnx_av.c +++ b/prov/lnx/src/lnx_av.c @@ -101,6 +101,7 @@ static int lnx_peer_av_remove(struct lnx_peer *lp) if (rc) frc = rc; } + ofi_ibuf_free(map_addr); } return frc; diff --git a/prov/lnx/src/lnx_domain.c b/prov/lnx/src/lnx_domain.c index 39dade126e1..3ac30a9dd83 100644 --- a/prov/lnx/src/lnx_domain.c +++ b/prov/lnx/src/lnx_domain.c @@ -76,18 +76,50 @@ static int lnx_domain_close(struct fid *fid) return frc; } +static int lnx_domain_control(struct fid *fid, int command, void *arg) +{ + struct fi_mr_map_raw *map_raw; + struct lnx_mr_key *mapped; + + switch (command) { + case FI_MAP_RAW_MR: + map_raw = arg; + + assert(map_raw->key_size % sizeof(uint64_t) == 0); + mapped = malloc(sizeof(*mapped) + map_raw->key_size); + if (!mapped) + return -FI_ENOMEM; + + memcpy(mapped->prov_keys, map_raw->raw_key, map_raw->key_size); + mapped->base_addr = map_raw->base_addr; + *map_raw->key = (uint64_t)(uintptr_t)mapped; + return FI_SUCCESS; + case FI_UNMAP_KEY: + mapped = (struct lnx_mr_key *)(uintptr_t) *(uint64_t *)arg; + if (!mapped) + return FI_SUCCESS; + + free(mapped); + return FI_SUCCESS; + default: + return -FI_ENOSYS; + } + + return -FI_ENOSYS; +} + static struct fi_ops lnx_domain_fi_ops = { .size = sizeof(struct fi_ops), .close = lnx_domain_close, .bind = fi_no_bind, - .control = fi_no_control, + .control = lnx_domain_control, .ops_open = fi_no_ops_open, }; static struct fi_ops_mr lnx_mr_ops = { .size = sizeof(struct fi_ops_mr), - .reg = fi_no_mr_reg, - .regv = fi_no_mr_regv, + .reg = lnx_mr_reg, + .regv = lnx_mr_regv, .regattr = lnx_mr_regattr, }; @@ -148,6 +180,8 @@ static int lnx_open_core_domains(struct lnx_fabric *lnx_fab, void *context, if (rc) return rc; + cd->idx = lnx_dom->ld_num_doms; + cd->key_seed = 1; lnx_dom->ld_num_doms++; cd->cd_fabric = cf; } diff --git a/prov/lnx/src/lnx_ep.c b/prov/lnx/src/lnx_ep.c index 528cca92bed..fd5557b6a1d 100644 --- a/prov/lnx/src/lnx_ep.c +++ b/prov/lnx/src/lnx_ep.c @@ -622,19 +622,43 @@ static void lnx_ep_nosys_progress(struct util_ep *util_ep) assert(0); } -static int lnx_common_match(uint64_t tag, uint64_t match_tag, - fi_addr_t recv_addr, fi_addr_t addr, - uint64_t ignore) +static int lnx_addr_match(fi_addr_t recv_addr, fi_addr_t addr) { fi_addr_t addr1 = lnx_decode_primary_id(recv_addr); fi_addr_t addr2 = lnx_decode_primary_id(addr); + if (recv_addr == FI_ADDR_UNSPEC) + return 1; + + return addr1 == addr2; +} + +static int lnx_tag_match(uint64_t tag, uint64_t match_tag, fi_addr_t recv_addr, + fi_addr_t addr, uint64_t ignore) +{ bool tmatch = ((tag | ignore) == (match_tag | ignore)); - if (recv_addr == FI_ADDR_UNSPEC) - return tmatch; + return tmatch && lnx_addr_match(recv_addr, addr); +} + +static int lnx_match_trecvq(struct dlist_entry *item, const void *args) +{ + struct lnx_match_attr *attr = (struct lnx_match_attr *) args; + struct lnx_rx_entry *entry = (struct lnx_rx_entry *) item; - return tmatch && (addr1 == addr2); + return lnx_tag_match(entry->rx_entry.tag, attr->lm_tag, + entry->rx_entry.addr, attr->lm_addr, + entry->rx_ignore); +} + +static int lnx_match_tunexq(struct dlist_entry *item, const void *args) +{ + struct lnx_match_attr *attr = (struct lnx_match_attr *) args; + struct lnx_rx_entry *entry = (struct lnx_rx_entry *) item; + + return lnx_tag_match(attr->lm_tag, entry->rx_entry.tag, + attr->lm_addr, entry->rx_entry.addr, + attr->lm_ignore); } static int lnx_match_recvq(struct dlist_entry *item, const void *args) @@ -642,9 +666,7 @@ static int lnx_match_recvq(struct dlist_entry *item, const void *args) struct lnx_match_attr *attr = (struct lnx_match_attr *) args; struct lnx_rx_entry *entry = (struct lnx_rx_entry *) item; - return lnx_common_match(entry->rx_entry.tag, attr->lm_tag, - entry->rx_entry.addr, attr->lm_addr, - entry->rx_ignore); + return lnx_addr_match(entry->rx_entry.addr, attr->lm_addr); } static int lnx_match_unexq(struct dlist_entry *item, const void *args) @@ -652,9 +674,7 @@ static int lnx_match_unexq(struct dlist_entry *item, const void *args) struct lnx_match_attr *attr = (struct lnx_match_attr *) args; struct lnx_rx_entry *entry = (struct lnx_rx_entry *) item; - return lnx_common_match(attr->lm_tag, entry->rx_entry.tag, - attr->lm_addr, entry->rx_entry.addr, - attr->lm_ignore); + return lnx_addr_match(attr->lm_addr, entry->rx_entry.addr); } static inline void lnx_init_qpair(struct lnx_qpair *qp, @@ -691,8 +711,8 @@ static int lnx_alloc_endpoint(struct fid_domain *domain, struct fi_info *info, lep->le_ep.ep_fid.atomic = &lnx_atomic_ops; lep->le_domain = container_of(domain, struct lnx_domain, ld_domain.domain_fid); - lnx_init_qpair(&lep->le_srq.lps_trecv, lnx_match_recvq, - lnx_match_unexq); + lnx_init_qpair(&lep->le_srq.lps_trecv, lnx_match_trecvq, + lnx_match_tunexq); lnx_init_qpair(&lep->le_srq.lps_recv, lnx_match_recvq, lnx_match_unexq); ofi_genlock_lock(&lep->le_domain->ld_domain.lock); @@ -718,9 +738,6 @@ static int lnx_alloc_endpoint(struct fid_domain *domain, struct fi_info *info, "Failed to create receive buffer pool"); goto fail; } - rc = ofi_spin_init(&lep->le_bplock); - if (rc) - goto fail; rc = lnx_open_core_eps(lep, context); if (rc) @@ -760,19 +777,6 @@ int lnx_endpoint(struct fid_domain *domain, struct fi_info *info, return 0; } -struct fi_ops_rma lnx_rma_ops = { - .size = sizeof(struct fi_ops_rma), - .read = fi_no_rma_read, - .readv = fi_no_rma_readv, - .readmsg = fi_no_rma_readmsg, - .write = fi_no_rma_write, - .writev = fi_no_rma_writev, - .writemsg = fi_no_rma_writemsg, - .inject = fi_no_rma_inject, - .writedata = fi_no_rma_writedata, - .injectdata = fi_no_rma_injectdata, -}; - struct fi_ops_atomic lnx_atomic_ops = { .size = sizeof(struct fi_ops_atomic), .write = fi_no_atomic_write, diff --git a/prov/lnx/src/lnx_init.c b/prov/lnx/src/lnx_init.c index a82554fe3dd..aaaf1926c38 100644 --- a/prov/lnx/src/lnx_init.c +++ b/prov/lnx/src/lnx_init.c @@ -43,12 +43,10 @@ FI_DELIVERY_COMPLETE | FI_TRANSMIT_COMPLETE) #define LNX_RX_OP_FLAGS (FI_COMPLETION) -#define LNX_TX_CAPS (FI_TAGGED | FI_SEND | FI_HMEM) -#define LNX_RX_CAPS (FI_TAGGED | FI_RECV | FI_DIRECTED_RECV | \ - FI_HMEM) - -ofi_spin_t global_bplock; -struct ofi_bufpool *global_recv_bp = NULL; +#define LNX_TX_CAPS (OFI_TX_MSG_CAPS | FI_TAGGED | FI_HMEM | \ + OFI_TX_RMA_CAPS) +#define LNX_RX_CAPS (OFI_RX_MSG_CAPS | FI_TAGGED | FI_HMEM | \ + FI_DIRECTED_RECV | OFI_RX_RMA_CAPS) struct util_fabric lnx_fabric_info; struct lnx_env lnx_env = { @@ -313,7 +311,7 @@ static int lnx_generate_link_info(uint32_t version, const struct fi_info *hints, size_t min_rx_size = SIZE_MAX, min_tx_size = SIZE_MAX; size_t min_iov_limit = LNX_IOV_LIMIT; int mr_mode = 0; - int len; + int len, link_cnt = 0; int str_len = 1024; char *tmp, *tmp2; char *link_name; @@ -400,6 +398,7 @@ static int lnx_generate_link_info(uint32_t version, const struct fi_info *hints, tmp += (len - 1); } } + link_cnt++; } link_name = realloc(link_name, strlen(link_name) + 1); @@ -439,6 +438,17 @@ static int lnx_generate_link_info(uint32_t version, const struct fi_info *hints, next->ep_attr->max_msg_size = min_max_msg_size; next->rx_attr->size = min_rx_size; next->tx_attr->size = min_tx_size; + next->domain_attr->mr_key_size = sizeof(uint64_t) * link_cnt; + + if (next->caps & FI_RMA) { + //TODO need to add support for using providers with and + //without FI_MR_VIRT_ADDR + if (!(hints->domain_attr->mr_mode & FI_MR_RAW)) { + fi_freeinfo(next); + return -FI_ENODATA; + } + next->domain_attr->mr_mode |= FI_MR_RAW; + } if (!next) return -FI_ENOMEM; @@ -777,7 +787,6 @@ static int lnx_fabric(struct fi_fabric_attr *attr, struct fid_fabric **fabric, static void lnx_fini(void) { lnx_free_links(&lnx_links); - ofi_bufpool_destroy(global_recv_bp); } void ofi_link_fini(void) @@ -802,9 +811,6 @@ struct util_prov lnx_util_prov = { LNX_INI { - struct ofi_bufpool_attr bp_attrs = {}; - int ret; - fi_param_define(&lnx_prov, "multi_rail_selection", FI_PARAM_STRING, "Specify which Multi-Rail endpoint selection " "algorithm to use. One of: PER_MSG, PER_PEER. " @@ -822,20 +828,5 @@ LNX_INI dlist_init(&lnx_links); - if (!global_recv_bp) { - bp_attrs.size = sizeof(struct lnx_rx_entry); - bp_attrs.alignment = 8; - bp_attrs.max_cnt = UINT16_MAX; - bp_attrs.chunk_cnt = 64; - bp_attrs.flags = OFI_BUFPOOL_NO_TRACK; - ret = ofi_bufpool_create_attr(&bp_attrs, &global_recv_bp); - if (ret) { - FI_WARN(&lnx_prov, FI_LOG_FABRIC, - "Failed to create receive buffer pool"); - return NULL; - } - ofi_spin_init(&global_bplock); - } - return &lnx_prov; } diff --git a/prov/lnx/src/lnx_mr.c b/prov/lnx/src/lnx_mr.c index 9450e4a714e..e6d1b87a0f4 100644 --- a/prov/lnx/src/lnx_mr.c +++ b/prov/lnx/src/lnx_mr.c @@ -42,41 +42,53 @@ * target core provider we can do memory registration at that point */ -int lnx_mr_regattr_core(struct lnx_core_domain *cd, void *desc, +int lnx_mr_regattr_core(struct lnx_core_domain *cd, void **desc, size_t count, void **core_desc) { - int rc; - struct lnx_mr *lm; - - lm = (struct lnx_mr *)desc; - if (lm->lm_core_mr) - goto out; + int rc, i; + struct lnx_mr **lm; - rc = fi_mr_regattr(cd->cd_domain, &lm->lm_attr, lm->lm_mr.flags, - &lm->lm_core_mr); - if (rc) - return rc; + lm = (struct lnx_mr **)desc; + if (!lm) + return FI_SUCCESS; + + for (i = 0; i < count; i++) { + if (!lm[i]) { + core_desc[i] = NULL; + continue; + } + if (!lm[i]->lm_core_mr[cd->idx]) { + rc = fi_mr_regattr(cd->cd_domain, &lm[i]->lm_attr, + lm[i]->lm_mr.flags, + &lm[i]->lm_core_mr[cd->idx]); + if (rc) + return rc; + } + core_desc[i] = fi_mr_desc(lm[i]->lm_core_mr[cd->idx]); + } -out: - *core_desc = lm->lm_core_mr->mem_desc; return FI_SUCCESS; } static int lnx_mr_close(struct fid *fid) { int rc, frc = FI_SUCCESS; + struct lnx_domain *domain; struct lnx_mr *lm; + int i; lm = container_of(fid, struct lnx_mr, lm_mr.mr_fid.fid); - - if (lm->lm_core_mr) { - rc = fi_close(&lm->lm_core_mr->fid); - if (rc) - frc = rc; + domain = container_of(lm->lm_mr.domain, struct lnx_domain, ld_domain); + + for (i = 0; i < domain->ld_num_doms; i++) { + if (lm->lm_core_mr[i]) { + rc = fi_close(&lm->lm_core_mr[i]->fid); + if (rc) + frc = rc; + } } - - ofi_atomic_dec32(&lm->lm_mr.domain->ref); - + free(lm->lm_core_mr); + ofi_atomic_dec32(&domain->ld_domain.ref); ofi_buf_free(lm); return frc; @@ -89,7 +101,25 @@ static int lnx_mr_bind(struct fid *fid, struct fid *bfid, uint64_t flags) static int lnx_mr_control(struct fid *fid, int command, void *arg) { - return -FI_ENOSYS; + struct lnx_mr *lm; + struct fi_mr_raw_attr *raw_attr = arg; + + if (command != FI_GET_RAW_MR) + return -FI_ENOSYS; + + lm = container_of(fid, struct lnx_mr, lm_mr.mr_fid.fid); + if (*raw_attr->key_size < lm->key_size) { + FI_WARN(&lnx_prov, FI_LOG_MR, + "Raw key buffer is too small: input %lu, needed %lu\n", + *raw_attr->key_size, lm->key_size); + *raw_attr->key_size = lm->key_size; + return -FI_ETOOSMALL; + } + + memcpy(raw_attr->raw_key, lm->raw_key, lm->key_size); + *raw_attr->key_size = lm->key_size; + + return FI_SUCCESS; } static struct fi_ops lnx_mr_fi_ops = { @@ -103,9 +133,13 @@ static struct fi_ops lnx_mr_fi_ops = { int lnx_mr_regattr(struct fid *fid, const struct fi_mr_attr *attr, uint64_t flags, struct fid_mr **mr_fid) { + struct lnx_core_domain *cd; struct lnx_domain *domain; struct ofi_mr *mr; struct lnx_mr *lm = NULL; + struct fi_mr_attr core_attr; + uint64_t key; + int rc, i; if (fid->fclass != FI_CLASS_DOMAIN || !attr || attr->iov_count <= 0 || attr->iov_count > LNX_IOV_LIMIT) @@ -119,10 +153,16 @@ int lnx_mr_regattr(struct fid *fid, const struct fi_mr_attr *attr, memset(lm, 0, sizeof(*lm)); + core_attr = *attr; lm->lm_attr = *attr; memcpy(lm->lm_iov, attr->mr_iov, - sizeof(struct iovec) * attr->iov_count); + sizeof(*attr->mr_iov) * attr->iov_count); lm->lm_attr.mr_iov = lm->lm_iov; + lm->lm_core_mr = calloc(sizeof(struct fid_mr *), domain->ld_num_doms); + if (!lm->lm_core_mr) { + free(lm); + return -FI_ENOMEM; + } mr = &lm->lm_mr; mr->mr_fid.fid.fclass = FI_CLASS_MR; mr->mr_fid.fid.ops = &lnx_mr_fi_ops; @@ -130,8 +170,63 @@ int lnx_mr_regattr(struct fid *fid, const struct fi_mr_attr *attr, mr->domain = &domain->ld_domain; mr->flags = flags; + if (attr->access & (FI_REMOTE_WRITE | FI_REMOTE_READ)) { + lm->key_size = sizeof(uint64_t) * domain->ld_num_doms; + lm->raw_key = malloc(lm->key_size); + + for (i = 0; i < domain->ld_num_doms; i++) { + cd = &domain->ld_core_domains[i]; + + if (!(cd->cd_info->domain_attr->mr_mode & + FI_MR_PROV_KEY)) + core_attr.requested_key = cd->key_seed++; + + rc = fi_mr_regattr(cd->cd_domain, &core_attr, + flags, &lm->lm_core_mr[i]); + if (rc) + return rc; + + key = fi_mr_key(lm->lm_core_mr[i]); + memcpy((char *)lm->raw_key + sizeof(key) * i, &key, + sizeof(key)); + } + } else { + lm->key_size = sizeof(uint64_t); + } + *mr_fid = &mr->mr_fid; ofi_atomic_inc32(&domain->ld_domain.ref); return FI_SUCCESS; } + +int lnx_mr_regv(struct fid *fid, const struct iovec *iov, size_t count, + uint64_t access, uint64_t offset, uint64_t requested_key, + uint64_t flags, struct fid_mr **mr, void *context) +{ + struct fi_mr_attr attr; + + attr.mr_iov = iov; + attr.iov_count = count; + attr.access = access; + attr.offset = offset; + attr.requested_key = requested_key; + attr.context = context; + attr.iface = FI_HMEM_SYSTEM; + attr.device.reserved = 0; + attr.hmem_data = NULL; + + return lnx_mr_regattr(fid, &attr, flags, mr); +} + +int lnx_mr_reg(struct fid *fid, const void *buf, size_t len, uint64_t access, + uint64_t offset, uint64_t requested_key, uint64_t flags, + struct fid_mr **mr, void *context) +{ + struct iovec iov; + + iov.iov_base = (void *) buf; + iov.iov_len = len; + return lnx_mr_regv(fid, &iov, 1, access, offset, requested_key, flags, + mr, context); +} diff --git a/prov/lnx/src/lnx_msg.c b/prov/lnx/src/lnx_msg.c index 8fa818cf8f1..285c1b9678d 100644 --- a/prov/lnx/src/lnx_msg.c +++ b/prov/lnx/src/lnx_msg.c @@ -44,7 +44,7 @@ ssize_t lnx_trecv(struct fid_ep *ep, void *buf, size_t len, void *desc, if (!lep) return -FI_ENOSYS; - return lnx_process_recv(lep, &iov, desc, src_addr, 1, tag, ignore, + return lnx_process_recv(lep, &iov, &desc, src_addr, 1, tag, ignore, context, lnx_ep_rx_flags(lep), true); } @@ -52,85 +52,73 @@ ssize_t lnx_trecvv(struct fid_ep *ep, const struct iovec *iov, void **desc, size_t count, fi_addr_t src_addr, uint64_t tag, uint64_t ignore, void *context) { - void *mr_desc; struct lnx_ep *lep; lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); if (!lep) return -FI_ENOSYS; - - if (count == 0) { - mr_desc = NULL; - } else if (iov && count >= 1 && - count <= lep->le_domain->ld_iov_limit) { - mr_desc = desc ? desc[0] : NULL; - } else { - FI_WARN(&lnx_prov, FI_LOG_CORE, "Invalid IOV\n"); - return -FI_EINVAL; - } - - return lnx_process_recv(lep, iov, mr_desc, src_addr, count, tag, ignore, + return lnx_process_recv(lep, iov, desc, src_addr, count, tag, ignore, context, lnx_ep_rx_flags(lep), true); } ssize_t lnx_trecvmsg(struct fid_ep *ep, const struct fi_msg_tagged *msg, uint64_t flags) { - void *mr_desc; struct lnx_ep *lep; lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); if (!lep) return -FI_ENOSYS; - if (msg->iov_count == 0) { - mr_desc = NULL; - } else if (msg->msg_iov && msg->iov_count >= 1 && - msg->iov_count <= lep->le_domain->ld_iov_limit) { - mr_desc = msg->desc ? msg->desc[0] : NULL; - } else { - FI_WARN(&lnx_prov, FI_LOG_CORE, "Invalid IOV\n"); - return -FI_EINVAL; - } - - return lnx_process_recv(lep, msg->msg_iov, mr_desc, msg->addr, + return lnx_process_recv(lep, msg->msg_iov, msg->desc, msg->addr, msg->iov_count, msg->tag, msg->ignore, msg->context, flags | lep->le_ep.rx_msg_flags, true); } -ssize_t lnx_tsend(struct fid_ep *ep, const void *buf, size_t len, void *desc, - fi_addr_t dest_addr, uint64_t tag, void *context) +static int lnx_send_lookup(struct fid_ep *ep, fi_addr_t dest_addr, void **desc, + size_t count, void **core_desc, fi_addr_t *core_addr, + struct lnx_core_ep **cep) { - int rc; struct lnx_ep *lep; - void *core_desc = NULL; - struct lnx_core_ep *cep; - fi_addr_t core_addr; + int rc; lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); if (!lep) return -FI_ENOSYS; - rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, - &core_addr); + rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, cep, + core_addr); if (rc) return rc; - FI_DBG(&lnx_prov, FI_LOG_CORE, - "sending to %" PRIx64 " tag %" PRIx64 " buf %p len %zu\n", - core_addr, tag, buf, len); - + FI_DBG(&lnx_prov, FI_LOG_CORE, "sending to %" PRIx64 "\n", *core_addr); if (desc) { - rc = lnx_mr_regattr_core(cep->cep_domain, desc, &core_desc); + rc = lnx_mr_regattr_core((*cep)->cep_domain, desc, count, + core_desc); if (rc) return rc; } + return 0; +} + +static ssize_t lnx_tsend(struct fid_ep *ep, const void *buf, size_t len, + void *desc, fi_addr_t dest_addr, uint64_t tag, + void *context) +{ + int rc; + void *core_desc = NULL; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + + rc = lnx_send_lookup(ep, dest_addr, &desc, 1, &core_desc, &core_addr, + &cep); + if (rc) + return rc; rc = fi_tsend(cep->cep_ep, buf, len, core_desc, core_addr, tag, context); - if (!rc) cep->cep_t_stats.st_num_tsend++; @@ -142,33 +130,17 @@ ssize_t lnx_tsendv(struct fid_ep *ep, const struct iovec *iov, void **desc, void *context) { int rc; - struct lnx_ep *lep; - void *core_desc = NULL; + void *core_desc[LNX_IOV_LIMIT] = {0}; struct lnx_core_ep *cep; fi_addr_t core_addr; - lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); - if (!lep) - return -FI_ENOSYS; - - rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, - &core_addr); + rc = lnx_send_lookup(ep, dest_addr, desc, count, core_desc, &core_addr, + &cep); if (rc) return rc; - FI_DBG(&lnx_prov, FI_LOG_CORE, - "sending to %" PRIx64 " tag %" PRIx64 " buf %p len %zu\n", - core_addr, tag, iov->iov_base, iov->iov_len); - - if (desc && *desc) { - rc = lnx_mr_regattr_core(cep->cep_domain, *desc, &core_desc); - if (rc) - return rc; - } - - rc = fi_tsendv(cep->cep_ep, iov, (core_desc) ? &core_desc : NULL, - count, core_addr, tag, context); - + rc = fi_tsendv(cep->cep_ep, iov, core_desc, count, core_addr, tag, + context); if (!rc) cep->cep_t_stats.st_num_tsendv++; @@ -179,19 +151,14 @@ ssize_t lnx_tsendmsg(struct fid_ep *ep, const struct fi_msg_tagged *msg, uint64_t flags) { int rc; - struct lnx_ep *lep; - void *core_desc = NULL; + void *core_desc[LNX_IOV_LIMIT] = {0}; struct lnx_core_ep *cep; struct fi_msg_tagged core_msg; memcpy(&core_msg, msg, sizeof(*msg)); - lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); - if (!lep) - return -FI_ENOSYS; - - rc = lnx_select_send_endpoints[lep->le_mr](lep, core_msg.addr, &cep, - &core_msg.addr); + rc = lnx_send_lookup(ep, msg->addr, msg->desc, msg->iov_count, + core_desc, &core_msg.addr, &cep); if (rc) return rc; @@ -199,16 +166,9 @@ ssize_t lnx_tsendmsg(struct fid_ep *ep, const struct fi_msg_tagged *msg, "sending to %" PRIx64 " tag %" PRIx64 "\n", core_msg.addr, core_msg.tag); - if (core_msg.desc && *core_msg.desc) { - rc = lnx_mr_regattr_core(cep->cep_domain, *core_msg.desc, - &core_desc); - if (rc) - return rc; - core_msg.desc = &core_desc; - } + core_msg.desc = core_desc; rc = fi_tsendmsg(cep->cep_ep, &core_msg, flags); - if (!rc) cep->cep_t_stats.st_num_tsendmsg++; @@ -219,25 +179,14 @@ ssize_t lnx_tinject(struct fid_ep *ep, const void *buf, size_t len, fi_addr_t dest_addr, uint64_t tag) { int rc; - struct lnx_ep *lep; struct lnx_core_ep *cep; fi_addr_t core_addr; - lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); - if (!lep) - return -FI_ENOSYS; - - rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, - &core_addr); + rc = lnx_send_lookup(ep, dest_addr, NULL, 0, NULL, &core_addr, &cep); if (rc) return rc; - FI_DBG(&lnx_prov, FI_LOG_CORE, - "sending to %" PRIx64 " tag %" PRIx64 " buf %p len %zu\n", - core_addr, tag, buf, len); - rc = fi_tinject(cep->cep_ep, buf, len, core_addr, tag); - if (!rc) cep->cep_t_stats.st_num_tinject++; @@ -249,64 +198,202 @@ ssize_t lnx_tsenddata(struct fid_ep *ep, const void *buf, size_t len, uint64_t tag, void *context) { int rc; - struct lnx_ep *lep; struct lnx_core_ep *cep; fi_addr_t core_addr; - void *core_desc = desc; + void *core_desc = NULL; + + rc = lnx_send_lookup(ep, dest_addr, &desc, 1, &core_desc, &core_addr, + &cep); + if (rc) + return rc; + + rc = fi_tsenddata(cep->cep_ep, buf, len, core_desc, data, core_addr, + tag, context); + if (!rc) + cep->cep_t_stats.st_num_tsenddata++; + + return rc; +} + +ssize_t lnx_tinjectdata(struct fid_ep *ep, const void *buf, size_t len, + uint64_t data, fi_addr_t dest_addr, uint64_t tag) +{ + int rc; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + + rc = lnx_send_lookup(ep, dest_addr, NULL, 0, NULL, &core_addr, &cep); + if (rc) + return rc; + + rc = fi_tinjectdata(cep->cep_ep, buf, len, data, core_addr, tag); + if (!rc) + cep->cep_t_stats.st_num_tinjectdata++; + + return rc; +} + +static ssize_t lnx_recv(struct fid_ep *ep, void *buf, size_t len, void *desc, + fi_addr_t src_addr, void *context) +{ + struct lnx_ep *lep; + const struct iovec iov = {.iov_base = buf, .iov_len = len}; + + lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + return lnx_process_recv(lep, &iov, &desc, src_addr, 1, 0, 0, context, + lnx_ep_rx_flags(lep), false); +} + +static ssize_t lnx_recvv(struct fid_ep *ep, const struct iovec *iov, + void **desc, size_t count, fi_addr_t src_addr, + void *context) +{ + struct lnx_ep *lep; + + lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + return lnx_process_recv(lep, iov, desc, src_addr, count, 0, 0, + context, lnx_ep_rx_flags(lep), false); +} + +static ssize_t lnx_recvmsg(struct fid_ep *ep, const struct fi_msg *msg, + uint64_t flags) +{ + struct lnx_ep *lep; lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); if (!lep) return -FI_ENOSYS; - rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, - &core_addr); + return lnx_process_recv(lep, msg->msg_iov, msg->desc, msg->addr, + msg->iov_count, 0, 0, msg->context, + flags | lep->le_ep.rx_msg_flags, false); +} + +static ssize_t lnx_send(struct fid_ep *ep, const void *buf, size_t len, + void *desc, fi_addr_t dest_addr, void *context) +{ + int rc; + void *core_desc = NULL; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + + rc = lnx_send_lookup(ep, dest_addr, &desc, 1, &core_desc, &core_addr, + &cep); if (rc) return rc; - FI_DBG(&lnx_prov, FI_LOG_CORE, - "sending to %" PRIx64 " tag %" PRIx64 " buf %p len %zu\n", - core_addr, tag, buf, len); + rc = fi_send(cep->cep_ep, buf, len, core_desc, core_addr, context); + if (!rc) + cep->cep_t_stats.st_num_send++; - if (desc) { - rc = lnx_mr_regattr_core(cep->cep_domain, desc, &core_desc); - if (rc) - return rc; - } + return rc; +} + +static ssize_t lnx_sendv(struct fid_ep *ep, const struct iovec *iov, + void **desc, size_t count, fi_addr_t dest_addr, + void *context) +{ + int rc; + void *core_desc[LNX_IOV_LIMIT] = {0}; + struct lnx_core_ep *cep; + fi_addr_t core_addr; - rc = fi_tsenddata(cep->cep_ep, buf, len, core_desc, - data, core_addr, tag, context); + rc = lnx_send_lookup(ep, dest_addr, desc, count, core_desc, &core_addr, + &cep); + if (rc) + return rc; + rc = fi_sendv(cep->cep_ep, iov, core_desc, count, core_addr, context); if (!rc) - cep->cep_t_stats.st_num_tsenddata++; + cep->cep_t_stats.st_num_sendv++; return rc; } -ssize_t lnx_tinjectdata(struct fid_ep *ep, const void *buf, size_t len, - uint64_t data, fi_addr_t dest_addr, uint64_t tag) +static ssize_t lnx_sendmsg(struct fid_ep *ep, const struct fi_msg *msg, + uint64_t flags) +{ + int rc; + void *core_desc[LNX_IOV_LIMIT] = {0}; + struct lnx_core_ep *cep; + struct fi_msg core_msg; + + memcpy(&core_msg, msg, sizeof(*msg)); + + rc = lnx_send_lookup(ep, msg->addr, msg->desc, msg->iov_count, + core_desc, &core_msg.addr, &cep); + if (rc) + return rc; + + core_msg.desc = core_desc; + + rc = fi_sendmsg(cep->cep_ep, &core_msg, flags); + if (!rc) + cep->cep_t_stats.st_num_sendmsg++; + + return rc; +} + +static ssize_t lnx_inject(struct fid_ep *ep, const void *buf, size_t len, + fi_addr_t dest_addr) { int rc; - struct lnx_ep *lep; struct lnx_core_ep *cep; fi_addr_t core_addr; - lep = container_of(ep, struct lnx_ep, le_ep.ep_fid.fid); - if (!lep) - return -FI_ENOSYS; + rc = lnx_send_lookup(ep, dest_addr, NULL, 0, NULL, &core_addr, &cep); + if (rc) + return rc; + + rc = fi_inject(cep->cep_ep, buf, len, core_addr); + if (!rc) + cep->cep_t_stats.st_num_inject++; + + return rc; +} - rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, - &core_addr); +static ssize_t lnx_senddata(struct fid_ep *ep, const void *buf, size_t len, + void *desc, uint64_t data, fi_addr_t dest_addr, + void *context) +{ + int rc; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + void *core_desc = NULL; + + rc = lnx_send_lookup(ep, dest_addr, &desc, 1, &core_desc, &core_addr, + &cep); if (rc) return rc; - FI_DBG(&lnx_prov, FI_LOG_CORE, - "sending to %" PRIx64 " tag %" PRIx64 " buf %p len %zu\n", - core_addr, tag, buf, len); + rc = fi_senddata(cep->cep_ep, buf, len, core_desc, data, core_addr, + context); + if (!rc) + cep->cep_t_stats.st_num_senddata++; - rc = fi_tinjectdata(cep->cep_ep, buf, len, data, core_addr, tag); + return rc; +} +static ssize_t lnx_injectdata(struct fid_ep *ep, const void *buf, size_t len, + uint64_t data, fi_addr_t dest_addr) +{ + int rc; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + + rc = lnx_send_lookup(ep, dest_addr, NULL, 0, NULL, &core_addr, &cep); + if (rc) + return rc; + + rc = fi_injectdata(cep->cep_ep, buf, len, data, core_addr); if (!rc) - cep->cep_t_stats.st_num_tinjectdata++; + cep->cep_t_stats.st_num_injectdata++; return rc; } @@ -326,13 +413,13 @@ struct fi_ops_tagged lnx_tagged_ops = { struct fi_ops_msg lnx_msg_ops = { .size = sizeof(struct fi_ops_msg), - .recv = fi_no_msg_recv, - .recvv = fi_no_msg_recvv, - .recvmsg = fi_no_msg_recvmsg, - .send = fi_no_msg_send, - .sendv = fi_no_msg_sendv, - .sendmsg = fi_no_msg_sendmsg, - .inject = fi_no_msg_inject, - .senddata = fi_no_msg_senddata, - .injectdata = fi_no_msg_injectdata, + .recv = lnx_recv, + .recvv = lnx_recvv, + .recvmsg = lnx_recvmsg, + .send = lnx_send, + .sendv = lnx_sendv, + .sendmsg = lnx_sendmsg, + .inject = lnx_inject, + .senddata = lnx_senddata, + .injectdata = lnx_injectdata, }; diff --git a/prov/lnx/src/lnx_rma.c b/prov/lnx/src/lnx_rma.c new file mode 100644 index 00000000000..409ef065ff1 --- /dev/null +++ b/prov/lnx/src/lnx_rma.c @@ -0,0 +1,410 @@ +/* + * Copyright (c) Intel Corporation. All rights reserved. + * + * This software is available to you under a choice of one of two + * licenses. You may choose to be licensed under the terms of the GNU + * General Public License (GPL) Version 2, available from the file + * COPYING in the main directory of this source tree, or the + * BSD license below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include "lnx.h" + +static void lnx_get_core_rma(struct lnx_core_ep *cep, + const struct fi_rma_iov *iov, + size_t rma_iov_count, struct fi_rma_iov *core_iov) +{ + struct lnx_mr_key *mr_key; + uint64_t core_mode = cep->cep_domain->cd_info->domain_attr->mr_mode; + uint64_t lnx_mode = cep->cep_parent->le_domain->ld_domain.mr_mode; + int i; + + for (i = 0; i < rma_iov_count; i++) { + mr_key = (struct lnx_mr_key *) iov[i].key; + core_iov[i].key = mr_key->prov_keys[cep->cep_domain->idx]; + if (core_mode & FI_MR_VIRT_ADDR || + !(lnx_mode & FI_MR_VIRT_ADDR)) + core_iov[i].addr = iov[i].addr; + else + core_iov[i].addr = (uintptr_t) iov[i].addr - + (uintptr_t) mr_key->base_addr; + core_iov[i].len = iov[i].len; + } +} + +static ssize_t lnx_read(struct fid_ep *ep_fid, void *buf, size_t len, + void *desc, fi_addr_t src_addr, uint64_t addr, + uint64_t key, void *context) +{ + int rc; + struct lnx_ep *lep; + void *core_desc = NULL; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + struct fi_rma_iov app_iov = {.addr = addr, .len = len, .key = key}; + struct fi_rma_iov core_iov; + + lep = container_of(ep_fid, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + rc = lnx_select_send_endpoints[lep->le_mr](lep, src_addr, &cep, + &core_addr); + if (rc) + return rc; + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "reading from %" PRIx64 " buf %p len %zu\n", + core_addr, buf, len); + + if (desc) { + rc = lnx_mr_regattr_core(cep->cep_domain, &desc, 1, &core_desc); + if (rc) + return rc; + } + + lnx_get_core_rma(cep, &app_iov, 1, &core_iov); + + rc = fi_read(cep->cep_ep, buf, len, core_desc, core_addr, core_iov.addr, + core_iov.key, context); + if (!rc) + cep->cep_t_stats.st_num_read++; + + return rc; +} + +static ssize_t lnx_readv(struct fid_ep *ep_fid, const struct iovec *iov, + void **desc, size_t count, fi_addr_t src_addr, + uint64_t addr, uint64_t key, void *context) +{ + int rc; + struct lnx_ep *lep; + void *core_desc[LNX_IOV_LIMIT] = {0}; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + struct fi_rma_iov app_iov = {.addr = addr, .len = 0, .key = key}; + struct fi_rma_iov core_iov; + + lep = container_of(ep_fid, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + rc = lnx_select_send_endpoints[lep->le_mr](lep, src_addr, &cep, + &core_addr); + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "reading from %" PRIx64 " buf %p len %zu\n", + core_addr, iov[0].iov_base, iov[0].iov_len); + + if (desc) { + rc = lnx_mr_regattr_core(cep->cep_domain, desc, count, + core_desc); + if (rc) + return rc; + } + + lnx_get_core_rma(cep, &app_iov, 1, &core_iov); + + rc = fi_readv(cep->cep_ep, iov, core_desc, count, core_addr, + core_iov.addr, core_iov.key, context); + if (!rc) + cep->cep_t_stats.st_num_readv++; + + return rc; +} + +static ssize_t lnx_readmsg(struct fid_ep *ep_fid, const struct fi_msg_rma *msg, + uint64_t flags) +{ + int rc; + struct lnx_ep *lep; + void *core_desc[LNX_IOV_LIMIT] = {0}; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + struct fi_rma_iov core_rma_iov[LNX_IOV_LIMIT]; + struct fi_msg_rma core_msg = *msg; + + lep = container_of(ep_fid, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + rc = lnx_select_send_endpoints[lep->le_mr](lep, msg->addr, &cep, + &core_addr); + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "reading from %" PRIx64 " buf %p len %zu\n", + core_addr, msg->msg_iov[0].iov_base, msg->msg_iov[0].iov_len); + + if (msg->desc) { + rc = lnx_mr_regattr_core(cep->cep_domain, msg->desc, + msg->iov_count, core_desc); + if (rc) + return rc; + } + + lnx_get_core_rma(cep, msg->rma_iov, msg->rma_iov_count, core_rma_iov); + core_msg.rma_iov = core_rma_iov; + core_msg.desc = core_desc; + + rc = fi_readmsg(cep->cep_ep, &core_msg, flags); + if (!rc) + cep->cep_t_stats.st_num_readmsg++; + + return rc; +} + +static ssize_t lnx_write(struct fid_ep *ep_fid, const void *buf, size_t len, + void *desc, fi_addr_t dest_addr, uint64_t addr, + uint64_t key, void *context) +{ + int rc; + struct lnx_ep *lep; + void *core_desc = NULL; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + struct fi_rma_iov app_iov = {.addr = addr, .len = len, .key = key}; + struct fi_rma_iov core_iov; + + lep = container_of(ep_fid, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, + &core_addr); + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "writing to %" PRIx64 " buf %p len %zu\n", + core_addr, buf, len); + + if (desc) { + rc = lnx_mr_regattr_core(cep->cep_domain, &desc, 1, &core_desc); + if (rc) + return rc; + } + + lnx_get_core_rma(cep, &app_iov, 1, &core_iov); + + rc = fi_write(cep->cep_ep, buf, len, core_desc, core_addr, + core_iov.addr, core_iov.key, context); + if (!rc) + cep->cep_t_stats.st_num_write++; + + return rc; +} + +static ssize_t lnx_writev(struct fid_ep *ep_fid, const struct iovec *iov, + void **desc, size_t count, fi_addr_t dest_addr, + uint64_t addr, uint64_t key, void *context) +{ + int rc; + struct lnx_ep *lep; + void *core_desc[LNX_IOV_LIMIT] = {0}; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + struct fi_rma_iov app_iov = {.addr = addr, .len = 0, .key = key}; + struct fi_rma_iov core_iov; + + lep = container_of(ep_fid, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, + &core_addr); + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "writing to %" PRIx64 " buf %p len %zu\n", + core_addr, iov[0].iov_base, iov[0].iov_len); + + if (desc) { + rc = lnx_mr_regattr_core(cep->cep_domain, desc, count, + core_desc); + if (rc) + return rc; + } + + lnx_get_core_rma(cep, &app_iov, 1, &core_iov); + + rc = fi_writev(cep->cep_ep, iov, core_desc, count, core_addr, + core_iov.addr, core_iov.key, context); + if (!rc) + cep->cep_t_stats.st_num_writev++; + + return rc; +} + + +static ssize_t lnx_writemsg(struct fid_ep *ep_fid, const struct fi_msg_rma *msg, + uint64_t flags) +{ + int rc; + struct lnx_ep *lep; + void *core_desc[LNX_IOV_LIMIT] = {0}; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + struct fi_msg_rma core_msg = *msg; + struct fi_rma_iov core_rma_iov[LNX_IOV_LIMIT]; + + lep = container_of(ep_fid, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + rc = lnx_select_send_endpoints[lep->le_mr](lep, msg->addr, &cep, + &core_addr); + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "writing to %" PRIx64 " buf %p len %zu\n", + core_addr, msg->msg_iov[0].iov_base, msg->msg_iov[0].iov_len); + + if (msg->desc) { + rc = lnx_mr_regattr_core(cep->cep_domain, msg->desc, + msg->iov_count, core_desc); + if (rc) + return rc; + } + lnx_get_core_rma(cep, msg->rma_iov, msg->rma_iov_count, core_rma_iov); + core_msg.rma_iov = core_rma_iov; + core_msg.desc = core_desc; + + rc = fi_writemsg(cep->cep_ep, &core_msg, flags); + if (!rc) + cep->cep_t_stats.st_num_writemsg++; + + return rc; +} + +static ssize_t lnx_writedata(struct fid_ep *ep_fid, const void *buf, size_t len, + void *desc, uint64_t data, fi_addr_t dest_addr, + uint64_t addr, uint64_t key, void *context) +{ + int rc; + struct lnx_ep *lep; + void *core_desc = NULL; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + struct fi_rma_iov app_iov = {.addr = addr, .len = len, .key = key}; + + lep = container_of(ep_fid, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, + &core_addr); + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "writing to %" PRIx64 " buf %p len %zu\n", + core_addr, buf, len); + + if (desc) { + rc = lnx_mr_regattr_core(cep->cep_domain, &desc, 1, &core_desc); + if (rc) + return rc; + } + + lnx_get_core_rma(cep, &app_iov, 1, &app_iov); + + rc = fi_writedata(cep->cep_ep, buf, len, core_desc, data, core_addr, + app_iov.addr, app_iov.key, context); + if (!rc) + cep->cep_t_stats.st_num_writedata++; + + return rc; +} + +static ssize_t lnx_rma_inject(struct fid_ep *ep_fid, const void *buf, + size_t len, fi_addr_t dest_addr, uint64_t addr, + uint64_t key) +{ + int rc; + struct lnx_ep *lep; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + struct fi_rma_iov app_iov = {.addr = addr, .len = len, .key = key}; + struct fi_rma_iov core_iov; + + lep = container_of(ep_fid, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, + &core_addr); + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "writing to %" PRIx64 " buf %p len %zu\n", + core_addr, buf, len); + + lnx_get_core_rma(cep, &app_iov, 1, &core_iov); + rc = fi_inject_write(cep->cep_ep, buf, len, core_addr, core_iov.addr, + core_iov.key); + if (!rc) + cep->cep_t_stats.st_num_inject_write++; + + return rc; +} + +static ssize_t lnx_inject_writedata(struct fid_ep *ep_fid, const void *buf, + size_t len, uint64_t data, + fi_addr_t dest_addr, uint64_t addr, + uint64_t key) +{ + int rc; + struct lnx_ep *lep; + struct lnx_core_ep *cep; + fi_addr_t core_addr; + struct fi_rma_iov app_iov = {.addr = addr, .len = len, .key = key}; + struct fi_rma_iov core_iov; + + lep = container_of(ep_fid, struct lnx_ep, le_ep.ep_fid.fid); + if (!lep) + return -FI_ENOSYS; + + rc = lnx_select_send_endpoints[lep->le_mr](lep, dest_addr, &cep, + &core_addr); + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "writing to %" PRIx64 " buf %p len %zu\n", + core_addr, buf, len); + + lnx_get_core_rma(cep, &app_iov, 1, &core_iov); + + rc = fi_inject_writedata(cep->cep_ep, buf, len, data, core_addr, + core_iov.addr, core_iov.key); + if (!rc) + cep->cep_t_stats.st_num_inject_writedata++; + + return rc; +} + +struct fi_ops_rma lnx_rma_ops = { + .size = sizeof(struct fi_ops_rma), + .read = lnx_read, + .readv = lnx_readv, + .readmsg = lnx_readmsg, + .write = lnx_write, + .writev = lnx_writev, + .writemsg = lnx_writemsg, + .inject = lnx_rma_inject, + .writedata = lnx_writedata, + .injectdata = lnx_inject_writedata, +}; diff --git a/prov/lnx/src/lnx_srx.c b/prov/lnx/src/lnx_srx.c index 0b3da0053c6..e046c8dbeb9 100644 --- a/prov/lnx/src/lnx_srx.c +++ b/prov/lnx/src/lnx_srx.c @@ -34,33 +34,15 @@ #include "lnx.h" #include "ofi_iov.h" -static int lnx_get_msg(struct fid_peer_srx *srx, - struct fi_peer_match_attr *match, - struct fi_peer_rx_entry **entry) -{ - return -FI_ENOSYS; -} - -static int lnx_queue_msg(struct fi_peer_rx_entry *entry) -{ - return -FI_ENOSYS; -} - static void lnx_free_entry(struct fi_peer_rx_entry *entry) { struct lnx_rx_entry *rx_entry; - ofi_spin_t *bplock; rx_entry = container_of(entry, struct lnx_rx_entry, rx_entry); - if (rx_entry->rx_global) - bplock = &global_bplock; - else - bplock = &rx_entry->rx_lep->le_bplock; - - ofi_spin_lock(bplock); + ofi_genlock_lock(&rx_entry->rx_lep->le_ep.lock); ofi_buf_free(rx_entry); - ofi_spin_unlock(bplock); + ofi_genlock_unlock(&rx_entry->rx_lep->le_ep.lock); } static void lnx_init_rx_entry(struct lnx_rx_entry *entry, @@ -80,7 +62,7 @@ static void lnx_init_rx_entry(struct lnx_rx_entry *entry, entry->rx_entry.addr = addr; entry->rx_entry.context = context; entry->rx_entry.tag = tag; - entry->rx_entry.flags = flags | FI_TAGGED | FI_RECV; + entry->rx_entry.flags = flags; entry->rx_ignore = ignore; } @@ -91,31 +73,16 @@ static struct lnx_rx_entry *get_rx_entry(struct lnx_ep *lep, void *context, uint64_t flags) { struct lnx_rx_entry *rx_entry = NULL; - ofi_spin_t *bplock; - struct ofi_bufpool *bp; - /* if lp is NULL, then we don't know where the message is going to - * come from, so allocate the rx_entry from a global pool - */ - if (!lep) { - bp = global_recv_bp; - bplock = &global_bplock; - } else { - bp = lep->le_recv_bp; - bplock = &lep->le_bplock; - } + ofi_genlock_lock(&lep->le_ep.lock); + rx_entry = (struct lnx_rx_entry *)ofi_buf_alloc(lep->le_recv_bp); + ofi_genlock_unlock(&lep->le_ep.lock); + if (!rx_entry) + return NULL; - ofi_spin_lock(bplock); - rx_entry = (struct lnx_rx_entry *)ofi_buf_alloc(bp); - ofi_spin_unlock(bplock); - if (rx_entry) { - memset(rx_entry, 0, sizeof(*rx_entry)); - if (!lep) - rx_entry->rx_global = true; - rx_entry->rx_lep = lep; - lnx_init_rx_entry(rx_entry, iov, desc, count, addr, tag, - ignore, context, flags); - } + rx_entry->rx_lep = lep; + lnx_init_rx_entry(rx_entry, iov, desc, count, addr, tag, ignore, + context, flags); return rx_entry; } @@ -238,9 +205,79 @@ static int lnx_get_tag(struct fid_peer_srx *srx, cep->cep_t_stats.st_num_posted_recvs++; rx_entry->rx_entry.addr = lnx_get_core_addr(cep, addr); - if (rx_entry->rx_entry.desc && *rx_entry->rx_entry.desc) { + if (rx_entry->rx_entry.desc) { + rc = lnx_mr_regattr_core(cep->cep_domain, + rx_entry->rx_entry.desc, + rx_entry->rx_entry.count, + rx_entry->rx_entry.desc); + if (rc) + return rc; + } +finalize: + rx_entry->rx_entry.msg_size = match->msg_size; + *entry = &rx_entry->rx_entry; + +out: + return rc; +} + + +static int lnx_get_msg(struct fid_peer_srx *srx, + struct fi_peer_match_attr *match, + struct fi_peer_rx_entry **entry) +{ + struct lnx_match_attr match_attr = {0}; + struct lnx_peer_srq *lnx_srq; + struct lnx_core_ep *cep; + struct lnx_ep *lep; + struct lnx_rx_entry *rx_entry; + fi_addr_t addr = match->addr; + int rc = 0; + + cep = srx->ep_fid.fid.context; + lep = cep->cep_parent; + lnx_srq = &lep->le_srq; + + match_attr.lm_addr = addr; + + rx_entry = lnx_remove_first_match(&lnx_srq->lps_recv.lqp_recvq, + &match_attr); + if (rx_entry) { + FI_DBG(&lnx_prov, FI_LOG_CORE, + "addr = %" PRIx64 " found\n", match_attr.lm_addr); + + goto assign; + } + + FI_DBG(&lnx_prov, FI_LOG_CORE, + "addr = %" PRIx64 " not found\n", match_attr.lm_addr); + + rx_entry = get_rx_entry(lep, NULL, NULL, 0, addr, 0, 0, NULL, + FI_MSG | FI_RECV); + if (!rx_entry) { + rc = -FI_ENOMEM; + goto out; + } + + rx_entry->rx_entry.owner_context = lnx_srq; + rx_entry->rx_cep = cep; + + rc = -FI_ENOENT; + + cep->cep_t_stats.st_num_unexp_msgs++; + + goto finalize; + +assign: + lnx_set_send_pair[lep->le_mr](lep, cep, addr); + + cep->cep_t_stats.st_num_posted_recvs++; + + rx_entry->rx_entry.addr = lnx_get_core_addr(cep, addr); + if (rx_entry->rx_entry.desc) { rc = lnx_mr_regattr_core(cep->cep_domain, - *rx_entry->rx_entry.desc, + rx_entry->rx_entry.desc, + rx_entry->rx_entry.count, rx_entry->rx_entry.desc); if (rc) return rc; @@ -253,6 +290,22 @@ static int lnx_get_tag(struct fid_peer_srx *srx, return rc; } +static int lnx_queue_msg(struct fi_peer_rx_entry *entry) +{ + struct lnx_rx_entry *rx_entry; + struct lnx_peer_srq *lnx_srq = + (struct lnx_peer_srq*)entry->owner_context; + + rx_entry = container_of(entry, struct lnx_rx_entry, rx_entry); + FI_DBG(&lnx_prov, FI_LOG_CORE, + "addr = %" PRIx64 " found\n", entry->addr); + + lnx_insert_rx_entry(&lnx_srq->lps_recv.lqp_unexq, rx_entry); + + return 0; +} + + static void lnx_update_msg_entries( struct lnx_qpair *qp, fi_addr_t (*get_addr)(struct fi_peer_rx_entry *)) @@ -316,13 +369,14 @@ static int lnx_discard(struct lnx_ep *lep, struct lnx_rx_entry *rx_entry, } static int lnx_peek(struct lnx_ep *lep, struct lnx_match_attr *match_attr, - void *context, uint64_t flags) + void *context, uint64_t flags, bool tagged) { int rc; struct lnx_rx_entry *rx_entry; struct lnx_peer_srq *lnx_srq = &lep->le_srq; - rx_entry = lnx_find_first_match(&lnx_srq->lps_trecv.lqp_unexq, + rx_entry = lnx_find_first_match(tagged ? &lnx_srq->lps_trecv.lqp_unexq : + &lnx_srq->lps_recv.lqp_unexq, match_attr); if (!rx_entry) { FI_DBG(&lnx_prov, FI_LOG_CORE, @@ -340,7 +394,11 @@ static int lnx_peek(struct lnx_ep *lep, struct lnx_match_attr *match_attr, rx_entry->rx_entry.tag); if (flags & FI_DISCARD) { - rc = rx_entry->rx_cep->cep_srx.peer_ops->discard_tag( + if (tagged) + rc = rx_entry->rx_cep->cep_srx.peer_ops->discard_tag( + &rx_entry->rx_entry); + else + rc = rx_entry->rx_cep->cep_srx.peer_ops->discard_msg( &rx_entry->rx_entry); dlist_remove(&rx_entry->entry); lnx_free_entry(&rx_entry->rx_entry); @@ -372,7 +430,7 @@ static int lnx_peek(struct lnx_ep *lep, struct lnx_match_attr *match_attr, * If nothing is found on the unexpected messages, then add a receive * request on the SRQ; happens in the lnx_process_recv() */ -int lnx_process_recv(struct lnx_ep *lep, const struct iovec *iov, void *desc, +int lnx_process_recv(struct lnx_ep *lep, const struct iovec *iov, void **desc, fi_addr_t addr, size_t count, uint64_t tag, uint64_t ignore, void *context, uint64_t flags, bool tagged) @@ -382,8 +440,12 @@ int lnx_process_recv(struct lnx_ep *lep, const struct iovec *iov, void *desc, struct lnx_match_attr match_attr; struct lnx_core_ep *cep; int rc = 0; + uint64_t rx_flags = tagged ? FI_TAGGED | FI_RECV : FI_MSG | FI_RECV; + struct lnx_qpair *qp; fi_addr_t sub_addr, encoded_addr = lnx_encode_fi_addr(addr, 0); + //TODO fix locking + qp = tagged ? &lnx_srq->lps_trecv : &lnx_srq->lps_recv; /* Matching format should always be in the encoded form */ match_attr.lm_addr = (addr == FI_ADDR_UNSPEC) || !(lep->le_ep.caps & FI_DIRECTED_RECV) ? @@ -392,7 +454,7 @@ int lnx_process_recv(struct lnx_ep *lep, const struct iovec *iov, void *desc, match_attr.lm_tag = tag; if (flags & FI_PEEK) - return lnx_peek(lep, &match_attr, context, flags); + return lnx_peek(lep, &match_attr, context, flags, tagged); if (flags & FI_DISCARD) { rx_entry = (struct lnx_rx_entry *) @@ -404,8 +466,7 @@ int lnx_process_recv(struct lnx_ep *lep, const struct iovec *iov, void *desc, rx_entry = (struct lnx_rx_entry *) (((struct fi_context *)context)->internal[0]); } else { - rx_entry = lnx_remove_first_match(&lnx_srq->lps_trecv.lqp_unexq, - &match_attr); + rx_entry = lnx_remove_first_match(&qp->lqp_unexq, &match_attr); if (!rx_entry) { FI_DBG(&lnx_prov, FI_LOG_CORE, "addr=%" PRIx64 " tag=%" PRIx64 " ignore=%" @@ -426,12 +487,12 @@ int lnx_process_recv(struct lnx_ep *lep, const struct iovec *iov, void *desc, cep = rx_entry->rx_cep; sub_addr = lnx_get_core_addr(cep, rx_entry->rx_entry.addr); lnx_init_rx_entry(rx_entry, iov, desc, count, sub_addr, tag, ignore, - context, rx_entry->rx_entry.flags | flags); + context, rx_entry->rx_entry.flags | flags | rx_flags); rx_entry->rx_entry.msg_size = MIN(ofi_total_iov_len(iov, count), rx_entry->rx_entry.msg_size); if (desc) { - rc = lnx_mr_regattr_core(cep->cep_domain, desc, + rc = lnx_mr_regattr_core(cep->cep_domain, desc, count, rx_entry->rx_entry.desc); if (rc) return rc; @@ -444,21 +505,11 @@ int lnx_process_recv(struct lnx_ep *lep, const struct iovec *iov, void *desc, else rc = cep->cep_srx.peer_ops->start_msg(&rx_entry->rx_entry); - if (rc == -FI_EINPROGRESS) { - /* this is telling me that more messages can match the same - * rx_entry. So keep it on the queue - */ - FI_DBG(&lnx_prov, FI_LOG_CORE, - "addr = %" PRIx64 " tag = %" PRIx64 " ignore = %" PRIx64 - " start_tag() in progress\n", - addr, tag, ignore); - - goto insert_recvq; - } if (rc) FI_WARN(&lnx_prov, FI_LOG_CORE, "start tag failed with %d\n", rc); + //TODO fix MULTI_RECV FI_DBG(&lnx_prov, FI_LOG_CORE, "addr = %" PRIx64 " tag = %" PRIx64 " ignore = %" PRIx64 " start_tag() success\n", @@ -470,17 +521,13 @@ int lnx_process_recv(struct lnx_ep *lep, const struct iovec *iov, void *desc, /* nothing on the unexpected queue, then allocate one and put it on * the receive queue */ - rx_entry = get_rx_entry(NULL, iov, (desc) ? &desc : NULL, count, - match_attr.lm_addr, tag, ignore, context, - flags); + rx_entry = get_rx_entry(lep, iov, desc, count, match_attr.lm_addr, + tag, ignore, context, flags | rx_flags); if (!rx_entry) { rc = -FI_ENOMEM; goto out; } - -insert_recvq: - lnx_insert_rx_entry(&lnx_srq->lps_trecv.lqp_recvq, rx_entry); - + lnx_insert_rx_entry(&qp->lqp_recvq, rx_entry); out: return rc; }