diff --git a/prov/shm/src/smr.h b/prov/shm/src/smr.h index cc05852b43b..f636d5626b5 100644 --- a/prov/shm/src/smr.h +++ b/prov/shm/src/smr.h @@ -53,7 +53,6 @@ struct smr_ep { struct slist overflow_list; struct dlist_entry sar_list; struct dlist_entry async_cpy_list; - struct dlist_entry unexp_cmd_list; size_t min_multi_recv_size; int ep_idx; @@ -256,20 +255,21 @@ void smr_format_tx_pend(struct smr_pend_entry *pend, struct smr_cmd *cmd, uint64_t op_flags); void smr_generic_format(struct smr_cmd *cmd, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t tag, uint64_t data, - uint64_t op_flags); + uint8_t smr_flags); size_t smr_copy_to_sar(struct smr_ep *ep, struct smr_region *smr, struct smr_pend_entry *pend); size_t smr_copy_from_sar(struct smr_ep *ep, struct smr_region *smr, struct smr_pend_entry *pend); int smr_select_proto(void **desc, size_t iov_count, bool cma_avail, bool ipc_valid, uint32_t op, uint64_t total_len, - uint64_t op_flags); + uint64_t op_flags, uint8_t *smr_flags); typedef ssize_t (*smr_send_func)( struct smr_ep *ep, struct smr_region *peer_smr, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t tag, - uint64_t data, uint64_t op_flags, struct ofi_mr **desc, - const struct iovec *iov, size_t iov_count, size_t total_len, - void *context, struct smr_cmd *cmd); + uint64_t data, uint64_t op_flags, uint8_t smr_flags, + struct ofi_mr **desc, const struct iovec *iov, + size_t iov_count, size_t total_len, void *context, + struct smr_cmd *cmd); extern smr_send_func smr_send_ops[smr_proto_max]; int smr_write_err_comp(struct util_cq *cq, void *context, @@ -280,9 +280,9 @@ int smr_complete_rx(struct smr_ep *ep, void *context, uint32_t op, uint64_t flags, size_t len, void *buf, int64_t id, uint64_t tag, uint64_t data); -static inline uint64_t smr_rx_cq_flags(uint64_t rx_flags, uint16_t op_flags) +static inline uint64_t smr_rx_cq_flags(uint64_t rx_flags, uint8_t smr_flags) { - if (op_flags & SMR_REMOTE_CQ_DATA) + if (smr_flags & SMR_REMOTE_CQ_DATA) rx_flags |= FI_REMOTE_CQ_DATA; return rx_flags; } diff --git a/prov/shm/src/smr_atomic.c b/prov/shm/src/smr_atomic.c index acd062397a3..d54144896b9 100644 --- a/prov/shm/src/smr_atomic.c +++ b/prov/shm/src/smr_atomic.c @@ -69,7 +69,7 @@ static void smr_do_atomic_inline( smr_format_inline_atomic(cmd, desc, iov, iov_count); } -static void smr_format_inject_atomic( +static int smr_format_inject_atomic( struct smr_cmd *cmd, struct ofi_mr **desc, const struct iovec *iov, size_t count, const struct iovec *resultv, size_t result_count, @@ -80,8 +80,17 @@ static void smr_format_inject_atomic( size_t comp_size; cmd->hdr.proto = smr_proto_inject; + tx_buf = smr_get_inject_buf(smr); + if (!tx_buf) { + FI_DBG(&smr_prov, FI_LOG_EP_DATA, + "No inject buffers available, cannot send inject " + "message\n"); + return -FI_EAGAIN; + } - tx_buf = smr_get_inject_buf(smr, cmd); + cmd->data.inject_buf_index = smr_freestack_get_index( + smr_inject_pool(smr), + (char *)tx_buf); switch (cmd->hdr.op) { case ofi_op_atomic: cmd->hdr.size = ofi_copy_from_mr_iov( @@ -112,9 +121,11 @@ static void smr_format_inject_atomic( assert(0); break; } + + return FI_SUCCESS; } -static ssize_t smr_do_atomic_inject( +static int smr_do_atomic_inject( struct smr_ep *ep, struct smr_region *peer_smr, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t op_flags, uint8_t datatype, uint8_t atomic_op, @@ -123,18 +134,20 @@ static ssize_t smr_do_atomic_inject( const struct iovec *resultv, size_t result_count, struct ofi_mr **comp_desc, const struct iovec *compv, size_t comp_count, size_t total_len, void *context, - uint16_t smr_flags, struct smr_cmd *cmd) + uint8_t smr_flags, struct smr_cmd *cmd) { struct smr_pend_entry *pend; + int ret; - smr_generic_format(cmd, tx_id, rx_id, op, 0, 0, op_flags); + smr_generic_format(cmd, tx_id, rx_id, op, 0, 0, smr_flags); smr_generic_atomic_format(cmd, datatype, atomic_op); - smr_format_inject_atomic(cmd, desc, iov, iov_count, resultv, - result_count, comp_desc, compv, comp_count, - ep->region); + ret = smr_format_inject_atomic(cmd, desc, iov, iov_count, resultv, + result_count, comp_desc, compv, + comp_count, peer_smr); + if (ret) + return ret; - if (op == ofi_op_atomic_fetch || op == ofi_op_atomic_compare || - atomic_op == FI_ATOMIC_READ || op_flags & FI_DELIVERY_COMPLETE) { + if (smr_flags & SMR_RETURN_CMD) { pend = ofi_buf_alloc(ep->pend_pool); assert(pend); cmd->hdr.tx_ctx = (uintptr_t) pend; @@ -148,13 +161,18 @@ static ssize_t smr_do_atomic_inject( } static int smr_select_atomic_proto(uint32_t op, uint64_t total_len, - uint64_t op_flags) + uint64_t op_flags, uint8_t *smr_flags) { + *smr_flags = 0; + assert(!(op_flags & FI_REMOTE_CQ_DATA)); if (op == ofi_op_atomic_compare || op == ofi_op_atomic_fetch || - op_flags & FI_DELIVERY_COMPLETE || total_len > SMR_MSG_DATA_LEN) + op_flags & FI_DELIVERY_COMPLETE) { + *smr_flags |= SMR_RETURN_CMD; return smr_proto_inject; + } - return smr_proto_inline; + return total_len > SMR_MSG_DATA_LEN ? smr_proto_inject : + smr_proto_inline; } static ssize_t smr_generic_atomic( @@ -174,12 +192,12 @@ static ssize_t smr_generic_atomic( struct iovec iov[SMR_IOV_LIMIT]; struct iovec compare_iov[SMR_IOV_LIMIT]; struct iovec result_iov[SMR_IOV_LIMIT]; - uint16_t smr_flags = 0; int64_t tx_id, rx_id, pos; int proto; ssize_t ret; size_t total_len; uint64_t atomic_flags; + uint8_t smr_flags; assert(count <= SMR_IOV_LIMIT); assert(result_count <= SMR_IOV_LIMIT); @@ -248,15 +266,8 @@ static ssize_t smr_generic_atomic( break; } - proto = smr_select_atomic_proto(op, total_len, op_flags); - - if (proto == smr_proto_inline) { - cmd = &ce->cmd; - smr_do_atomic_inline(ep, peer_smr, tx_id, rx_id, ofi_op_atomic, - op_flags, datatype, atomic_op, - (struct ofi_mr **) desc, iov, count, - total_len, cmd); - } else { + proto = smr_select_atomic_proto(op, total_len, op_flags, &smr_flags); + if (smr_flags & SMR_RETURN_CMD) { if (smr_freestack_isempty(smr_cmd_stack(ep->region))) { smr_cmd_queue_discard(ce, pos); ret = -FI_EAGAIN; @@ -266,7 +277,17 @@ static ssize_t smr_generic_atomic( cmd = smr_freestack_pop(smr_cmd_stack(ep->region)); assert(cmd); ce->ptr = smr_local_to_peer(ep, peer_smr, tx_id, rx_id, - (uintptr_t) cmd); + (uintptr_t) cmd); + } else { + cmd = &ce->cmd; + } + + if (proto == smr_proto_inline) { + smr_do_atomic_inline(ep, peer_smr, tx_id, rx_id, ofi_op_atomic, + op_flags, datatype, atomic_op, + (struct ofi_mr **) desc, iov, count, + total_len, cmd); + } else { ret = smr_do_atomic_inject(ep, peer_smr, tx_id, rx_id, op, op_flags, datatype, atomic_op, (struct ofi_mr **) desc, iov, count, @@ -281,16 +302,17 @@ static ssize_t smr_generic_atomic( } } - if (!cmd->hdr.tx_ctx) { - ret = smr_complete_tx(ep, context, op, op_flags); - if (ret) { - FI_WARN(&smr_prov, FI_LOG_EP_CTRL, - "unable to process tx completion\n"); - } - } - smr_format_rma_ioc(cmd, rma_ioc, rma_count); smr_cmd_queue_commit(ce, pos); + + if (smr_flags & SMR_RETURN_CMD) + goto unlock; + + ret = smr_complete_tx(ep, context, op, op_flags); + if (ret) + FI_WARN(&smr_prov, FI_LOG_EP_CTRL, + "unable to process tx completion\n"); + unlock: ofi_genlock_unlock(&ep->util_ep.lock); return ret; @@ -371,7 +393,6 @@ static ssize_t smr_atomic_inject( int64_t id, peer_id, pos; ssize_t ret; size_t total_len; - int proto; ep = container_of(ep_fid, struct smr_ep, util_ep.ep_fid.fid); @@ -412,24 +433,12 @@ static ssize_t smr_atomic_inject( rma_ioc.count = count; rma_ioc.key = key; + cmd = &ce->cmd; if (total_len <= SMR_MSG_DATA_LEN) { - proto = smr_proto_inline; - cmd = &ce->cmd; smr_do_atomic_inline(ep, peer_smr, id, peer_id, ofi_op_atomic, 0, datatype, op, NULL, &iov, 1, total_len, &ce->cmd); } else { - proto = smr_proto_inject; - if (smr_freestack_isempty(smr_cmd_stack(ep->region))) { - smr_cmd_queue_discard(ce, pos); - ret = -FI_EAGAIN; - goto unlock; - } - - cmd = smr_freestack_pop(smr_cmd_stack(ep->region)); - assert(cmd); - ce->ptr = smr_local_to_peer(ep, peer_smr, id, peer_id, - (uintptr_t) cmd); ret = smr_do_atomic_inject(ep, peer_smr, id, peer_id, ofi_op_atomic, 0, datatype, op, NULL, &iov, 1, NULL, NULL, 0, NULL, NULL, @@ -442,9 +451,7 @@ static ssize_t smr_atomic_inject( smr_format_rma_ioc(cmd, &rma_ioc, 1); smr_cmd_queue_commit(ce, pos); - - if (proto == smr_proto_inline) - ofi_ep_peer_tx_cntr_inc(&ep->util_ep, ofi_op_atomic); + ofi_ep_peer_tx_cntr_inc(&ep->util_ep, ofi_op_atomic); unlock: ofi_genlock_unlock(&ep->util_ep.lock); return ret; diff --git a/prov/shm/src/smr_ep.c b/prov/shm/src/smr_ep.c index 2e1c64a566b..5248d969167 100644 --- a/prov/shm/src/smr_ep.c +++ b/prov/shm/src/smr_ep.c @@ -245,19 +245,16 @@ void smr_format_tx_pend(struct smr_pend_entry *pend, struct smr_cmd *cmd, void smr_generic_format(struct smr_cmd *cmd, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t tag, uint64_t data, - uint64_t op_flags) + uint8_t smr_flags) { cmd->hdr.op = op; cmd->hdr.status = 0; - cmd->hdr.op_flags = 0; + cmd->hdr.smr_flags = smr_flags; cmd->hdr.tag = tag; cmd->hdr.tx_id = tx_id; cmd->hdr.rx_id = rx_id; cmd->hdr.cq_data = data; cmd->hdr.rx_ctx = 0; - - if (op_flags & FI_REMOTE_CQ_DATA) - cmd->hdr.op_flags |= SMR_REMOTE_CQ_DATA; } static void smr_format_inline(struct smr_cmd *cmd, struct ofi_mr **mr, @@ -268,24 +265,33 @@ static void smr_format_inline(struct smr_cmd *cmd, struct ofi_mr **mr, mr, iov, count, 0); } -static void smr_format_inject(struct smr_ep *ep, struct smr_cmd *cmd, - struct smr_pend_entry *pend) +static int smr_format_inject(struct smr_ep *ep, struct smr_region *peer_smr, + struct smr_cmd *cmd, struct ofi_mr **mr, + const struct iovec *iov, uint32_t iov_count, + uint64_t op_flags) { struct smr_inject_buf *tx_buf; - tx_buf = smr_get_inject_buf(ep->region, cmd); - cmd->hdr.proto = smr_proto_inject; - if (cmd->hdr.op != ofi_op_read_req) { + tx_buf = smr_get_inject_buf(peer_smr); + if (!tx_buf) { + FI_DBG(&smr_prov, FI_LOG_EP_DATA, + "No inject buffers available, cannot send inject " + "message\n"); + return -FI_EAGAIN; + } + + cmd->data.inject_buf_index = smr_freestack_get_index( + smr_inject_pool(peer_smr), + (char *)tx_buf); + if (cmd->hdr.op != ofi_op_read_req) cmd->hdr.size = ofi_copy_from_mr_iov(tx_buf->data, SMR_INJECT_SIZE, - pend->mr, pend->iov, - pend->iov_count, 0); - pend->bytes_done = cmd->hdr.size; - } else { - cmd->hdr.size = ofi_total_iov_len(pend->iov, pend->iov_count); - pend->bytes_done = 0; - } + mr, iov, iov_count, 0); + else + cmd->hdr.size = ofi_total_iov_len(iov, iov_count); + + return FI_SUCCESS; } static void smr_format_iov(struct smr_cmd *cmd, struct smr_pend_entry *pend) @@ -379,9 +385,8 @@ static int smr_format_sar(struct smr_ep *ep, struct smr_cmd *cmd, ret = pend->sar_copy_fn(ep, pend); if (ret < 0 && ret != -FI_EBUSY) { for (i = cmd->data.buf_batch_size - 1; i >= 0; i--) { - smr_freestack_push_by_index( - smr_sar_pool(ep->region), - cmd->data.sar[i]); + smr_return_sar_buf_by_index(ep->region, + cmd->data.sar[i]); } return -FI_EAGAIN; } @@ -398,12 +403,15 @@ static int smr_format_sar(struct smr_ep *ep, struct smr_cmd *cmd, int smr_select_proto(void **desc, size_t iov_count, bool vma_avail, bool ipc_valid, uint32_t op, uint64_t total_len, - uint64_t op_flags) + uint64_t op_flags, uint8_t *smr_flags) { struct ofi_mr *smr_desc; enum fi_hmem_iface iface = FI_HMEM_SYSTEM; bool fastcopy_avail = false, use_ipc = false; + *smr_flags = 0; + *smr_flags |= (op_flags & FI_DELIVERY_COMPLETE) ? SMR_RETURN_CMD : 0; + *smr_flags |= (op_flags & FI_REMOTE_CQ_DATA) ? SMR_REMOTE_CQ_DATA : 0; /* Do not inline/inject if IPC is available so device to device * transfer may occur if possible. */ if (iov_count == 1 && desc && desc[0] && ipc_valid) { @@ -420,6 +428,7 @@ int smr_select_proto(void **desc, size_t iov_count, bool vma_avail, } if (op == ofi_op_read_req) { + *smr_flags |= SMR_RETURN_CMD; if (use_ipc) return smr_proto_ipc; if (total_len <= SMR_INJECT_SIZE) @@ -429,32 +438,41 @@ int smr_select_proto(void **desc, size_t iov_count, bool vma_avail, return smr_proto_sar; } - if (fastcopy_avail && total_len <= smr_env.max_gdrcopy_size) - return total_len <= SMR_MSG_DATA_LEN ? smr_proto_inline : - smr_proto_inject; + if (fastcopy_avail && total_len <= smr_env.max_gdrcopy_size) { + if (total_len <= SMR_MSG_DATA_LEN && + !(op_flags & FI_DELIVERY_COMPLETE)) + return smr_proto_inline; + else + return smr_proto_inject; + } - if (use_ipc && !(op_flags & FI_INJECT)) + if (use_ipc && !(op_flags & FI_INJECT)) { + *smr_flags |= SMR_RETURN_CMD; return smr_proto_ipc; + } if (op_flags & FI_INJECT || total_len <= SMR_INJECT_SIZE) { if (op_flags & FI_DELIVERY_COMPLETE) return smr_proto_inject; + return total_len <= SMR_MSG_DATA_LEN ? smr_proto_inline : smr_proto_inject; } + *smr_flags |= SMR_RETURN_CMD; return vma_avail ? smr_proto_iov: smr_proto_sar; } static ssize_t smr_do_inline(struct smr_ep *ep, struct smr_region *peer_smr, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t tag, uint64_t data, uint64_t op_flags, - struct ofi_mr **desc, const struct iovec *iov, - size_t iov_count, size_t total_len, void *context, + uint8_t smr_flags, struct ofi_mr **desc, + const struct iovec *iov, size_t iov_count, + size_t total_len, void *context, struct smr_cmd *cmd) { cmd->hdr.tx_ctx = 0; - smr_generic_format(cmd, tx_id, rx_id, op, tag, data, op_flags); + smr_generic_format(cmd, tx_id, rx_id, op, tag, data, smr_flags); smr_format_inline(cmd, desc, iov, iov_count); return FI_SUCCESS; @@ -463,34 +481,54 @@ static ssize_t smr_do_inline(struct smr_ep *ep, struct smr_region *peer_smr, static ssize_t smr_do_inject(struct smr_ep *ep, struct smr_region *peer_smr, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t tag, uint64_t data, uint64_t op_flags, - struct ofi_mr **desc, const struct iovec *iov, - size_t iov_count, size_t total_len, void *context, + uint8_t smr_flags, struct ofi_mr **desc, + const struct iovec *iov, size_t iov_count, + size_t total_len, void *context, struct smr_cmd *cmd) { - struct smr_pend_entry *pend; + struct smr_pend_entry *pend = NULL; + int ret; - pend = ofi_buf_alloc(ep->pend_pool); - assert(pend); + if (smr_flags & SMR_RETURN_CMD) { + pend = ofi_buf_alloc(ep->pend_pool); + assert(pend); + cmd->hdr.tx_ctx = (uintptr_t) pend; + smr_format_tx_pend(pend, cmd, context, desc, iov, iov_count, + op_flags); + if (smr_freestack_avail(smr_cmd_stack(ep->region)) <= + smr_env.buffer_threshold) + smr_flags |= SMR_BUFFER_RECV; + } else { + cmd->hdr.tx_ctx = 0; + } - cmd->hdr.tx_ctx = (uintptr_t) pend; - smr_format_tx_pend(pend, cmd, context, desc, iov, iov_count, op_flags); + smr_generic_format(cmd, tx_id, rx_id, op, tag, data, smr_flags); + ret = smr_format_inject(ep, peer_smr, cmd, desc, iov, iov_count, + op_flags); + if (ret) + goto err; - smr_generic_format(cmd, tx_id, rx_id, op, tag, data, op_flags); - smr_format_inject(ep, cmd, pend); + if (total_len != cmd->hdr.size) { + ret = -FI_ETRUNC; + goto err; + } - if (smr_freestack_avail(smr_cmd_stack(ep->region)) <= - smr_env.buffer_threshold) - cmd->hdr.op_flags |= SMR_BUFFER_RECV; + if (pend && op != ofi_op_read_req) + pend->bytes_done = cmd->hdr.size; return FI_SUCCESS; +err: + if (pend) + ofi_buf_free(pend); + return ret; } static ssize_t smr_do_iov(struct smr_ep *ep, struct smr_region *peer_smr, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t tag, uint64_t data, uint64_t op_flags, - struct ofi_mr **desc, const struct iovec *iov, - size_t iov_count, size_t total_len, void *context, - struct smr_cmd *cmd) + uint8_t smr_flags, struct ofi_mr **desc, + const struct iovec *iov, size_t iov_count, + size_t total_len, void *context, struct smr_cmd *cmd) { struct smr_pend_entry *pend; @@ -500,12 +538,12 @@ static ssize_t smr_do_iov(struct smr_ep *ep, struct smr_region *peer_smr, cmd->hdr.tx_ctx = (uintptr_t) pend; smr_format_tx_pend(pend, cmd, context, desc, iov, iov_count, op_flags); - smr_generic_format(cmd, tx_id, rx_id, op, tag, data, op_flags); + smr_generic_format(cmd, tx_id, rx_id, op, tag, data, smr_flags); smr_format_iov(cmd, pend); if (smr_freestack_avail(smr_cmd_stack(ep->region)) <= smr_env.buffer_threshold) - cmd->hdr.op_flags |= SMR_BUFFER_RECV; + cmd->hdr.smr_flags |= SMR_BUFFER_RECV; return FI_SUCCESS; } @@ -513,9 +551,9 @@ static ssize_t smr_do_iov(struct smr_ep *ep, struct smr_region *peer_smr, static ssize_t smr_do_sar(struct smr_ep *ep, struct smr_region *peer_smr, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t tag, uint64_t data, uint64_t op_flags, - struct ofi_mr **desc, const struct iovec *iov, - size_t iov_count, size_t total_len, void *context, - struct smr_cmd *cmd) + uint8_t smr_flags, struct ofi_mr **desc, + const struct iovec *iov, size_t iov_count, + size_t total_len, void *context, struct smr_cmd *cmd) { struct smr_pend_entry *pend; int ret; @@ -534,7 +572,7 @@ static ssize_t smr_do_sar(struct smr_ep *ep, struct smr_region *peer_smr, else pend->sar_copy_fn = &smr_copy_sar; - smr_generic_format(cmd, tx_id, rx_id, op, tag, data, op_flags); + smr_generic_format(cmd, tx_id, rx_id, op, tag, data, smr_flags); ret = smr_format_sar(ep, cmd, desc, iov, iov_count, total_len, ep->region, peer_smr, pend); if (ret) @@ -546,9 +584,9 @@ static ssize_t smr_do_sar(struct smr_ep *ep, struct smr_region *peer_smr, static ssize_t smr_do_ipc(struct smr_ep *ep, struct smr_region *peer_smr, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t tag, uint64_t data, uint64_t op_flags, - struct ofi_mr **desc, const struct iovec *iov, - size_t iov_count, size_t total_len, void *context, - struct smr_cmd *cmd) + uint8_t smr_flags, struct ofi_mr **desc, + const struct iovec *iov, size_t iov_count, + size_t total_len, void *context, struct smr_cmd *cmd) { struct smr_pend_entry *pend; int ret = -FI_EAGAIN; @@ -557,7 +595,7 @@ static ssize_t smr_do_ipc(struct smr_ep *ep, struct smr_region *peer_smr, assert(pend); cmd->hdr.tx_ctx = (uintptr_t) pend; - smr_generic_format(cmd, tx_id, rx_id, op, tag, data, op_flags); + smr_generic_format(cmd, tx_id, rx_id, op, tag, data, smr_flags); assert(iov_count == 1 && desc && desc[0]); ret = smr_format_ipc(cmd, iov[0].iov_base, total_len, ep->region, desc[0]->iface, desc[0]->device); @@ -568,7 +606,7 @@ static ssize_t smr_do_ipc(struct smr_ep *ep, struct smr_region *peer_smr, "fallback to using SAR\n"); ofi_buf_free(pend); return smr_do_sar(ep, peer_smr, tx_id, rx_id, op, tag, data, - op_flags, desc, iov, iov_count, + op_flags, smr_flags, desc, iov, iov_count, total_len, context, cmd); } @@ -576,7 +614,7 @@ static ssize_t smr_do_ipc(struct smr_ep *ep, struct smr_region *peer_smr, if (smr_freestack_avail(smr_cmd_stack(ep->region)) <= smr_env.buffer_threshold) - cmd->hdr.op_flags |= SMR_BUFFER_RECV; + cmd->hdr.smr_flags |= SMR_BUFFER_RECV; return FI_SUCCESS; } @@ -687,25 +725,17 @@ static int smr_discard(struct fi_peer_rx_entry *rx_entry) struct smr_cmd_ctx *cmd_ctx = rx_entry->peer_context; struct smr_unexp_buf *sar_buf; - switch (cmd_ctx->cmd->hdr.proto) { - case smr_proto_inline: - break; - case smr_proto_sar: - while (!slist_empty(&cmd_ctx->buf_list)) { - slist_remove_head_container( - &cmd_ctx->buf_list, - struct smr_unexp_buf, sar_buf, - entry); - ofi_buf_free(sar_buf); - } - break; - case smr_proto_inject: - case smr_proto_iov: - case smr_proto_ipc: - smr_return_cmd(cmd_ctx->ep, cmd_ctx->cmd); - break; + while (!slist_empty(&cmd_ctx->buf_list)) { + slist_remove_head_container( + &cmd_ctx->buf_list, + struct smr_unexp_buf, sar_buf, + entry); + ofi_buf_free(sar_buf); } + if (cmd_ctx->cmd->hdr.smr_flags & SMR_RETURN_CMD) + smr_return_cmd(cmd_ctx->ep, cmd_ctx->cmd); + ofi_buf_free(cmd_ctx); return FI_SUCCESS; @@ -1002,7 +1032,6 @@ int smr_endpoint(struct fid_domain *domain, struct fi_info *info, goto ep; dlist_init(&ep->sar_list); - dlist_init(&ep->unexp_cmd_list); dlist_init(&ep->async_cpy_list); slist_init(&ep->overflow_list); diff --git a/prov/shm/src/smr_msg.c b/prov/shm/src/smr_msg.c index 29ca9684f80..18e847f3eae 100644 --- a/prov/shm/src/smr_msg.c +++ b/prov/shm/src/smr_msg.c @@ -85,6 +85,7 @@ static ssize_t smr_generic_sendmsg(struct smr_ep *ep, const struct iovec *iov, int proto; struct smr_cmd_entry *ce; struct smr_cmd *cmd; + uint8_t smr_flags; assert(iov_count <= SMR_IOV_LIMIT); @@ -110,9 +111,8 @@ static ssize_t smr_generic_sendmsg(struct smr_ep *ep, const struct iovec *iov, proto = smr_select_proto(desc, iov_count, smr_vma_enabled(ep, peer_smr), smr_ipc_valid(ep, peer_smr, tx_id, rx_id), op, - total_len, op_flags); - - if (proto != smr_proto_inline) { + total_len, op_flags, &smr_flags); + if (smr_flags & SMR_RETURN_CMD) { if (smr_freestack_isempty(smr_cmd_stack(ep->region))) { smr_cmd_queue_discard(ce, pos); ret = -FI_EAGAIN; @@ -128,17 +128,17 @@ static ssize_t smr_generic_sendmsg(struct smr_ep *ep, const struct iovec *iov, } ret = smr_send_ops[proto](ep, peer_smr, tx_id, rx_id, op, tag, data, - op_flags, (struct ofi_mr **) desc, iov, - iov_count, total_len, context, cmd); + op_flags, smr_flags, (struct ofi_mr **) desc, + iov, iov_count, total_len, context, cmd); if (ret) { smr_cmd_queue_discard(ce, pos); - if (proto != smr_proto_inline) + if (smr_flags & SMR_RETURN_CMD) smr_freestack_push(smr_cmd_stack(ep->region), cmd); goto unlock; } smr_cmd_queue_commit(ce, pos); - if (proto != smr_proto_inline) + if (smr_flags & SMR_RETURN_CMD) goto unlock; ret = smr_complete_tx(ep, context, op, op_flags); @@ -205,6 +205,7 @@ static ssize_t smr_generic_inject(struct fid_ep *ep_fid, const void *buf, int proto; struct smr_cmd_entry *ce; struct smr_cmd *cmd; + uint8_t smr_flags; assert(len <= SMR_INJECT_SIZE); @@ -232,36 +233,20 @@ static ssize_t smr_generic_inject(struct fid_ep *ep_fid, const void *buf, goto unlock; } - if (len <= SMR_MSG_DATA_LEN) { - proto = smr_proto_inline; - cmd = &ce->cmd; - } else { - proto = smr_proto_inject; - if (smr_freestack_isempty(smr_cmd_stack(ep->region))) { - smr_cmd_queue_discard(ce, pos); - ret = -FI_EAGAIN; - goto unlock; - } - - cmd = smr_freestack_pop(smr_cmd_stack(ep->region)); - assert(cmd); - ce->ptr = smr_local_to_peer(ep, peer_smr, tx_id, rx_id, - (uintptr_t) cmd); - } + proto = len <= SMR_MSG_DATA_LEN ? smr_proto_inline : smr_proto_inject; + cmd = &ce->cmd; + smr_flags = (op_flags & FI_REMOTE_CQ_DATA) ? SMR_REMOTE_CQ_DATA : 0; ret = smr_send_ops[proto](ep, peer_smr, tx_id, rx_id, op, tag, data, - op_flags, NULL, &msg_iov, 1, len, NULL, cmd); + op_flags, smr_flags, NULL, &msg_iov, 1, len, + NULL, cmd); if (ret) { - if (proto != smr_proto_inline) - smr_freestack_push(smr_cmd_stack(ep->region), cmd); smr_cmd_queue_discard(ce, pos); ret = -FI_EAGAIN; goto unlock; } smr_cmd_queue_commit(ce, pos); - - if (proto == smr_proto_inline) - ofi_ep_peer_tx_cntr_inc(&ep->util_ep, op); + ofi_ep_peer_tx_cntr_inc(&ep->util_ep, op); unlock: ofi_genlock_unlock(&ep->util_ep.lock); diff --git a/prov/shm/src/smr_progress.c b/prov/shm/src/smr_progress.c index 0cb8b9d508f..ea8fd7841d5 100644 --- a/prov/shm/src/smr_progress.c +++ b/prov/shm/src/smr_progress.c @@ -75,8 +75,7 @@ void smr_free_sar_bufs(struct smr_ep *ep, struct smr_cmd *cmd, int i; for (i = cmd->data.buf_batch_size - 1; i >= 0; i--) { - smr_freestack_push_by_index(smr_sar_pool(ep->region), - cmd->data.sar[i]); + smr_return_sar_buf_by_index(ep->region, cmd->data.sar[i]); } smr_peer_data(ep->region)[cmd->hdr.tx_id].sar_status = SMR_SAR_FREE; } @@ -87,6 +86,7 @@ static int smr_progress_return_entry(struct smr_ep *ep, struct smr_cmd *cmd, struct smr_inject_buf *tx_buf = NULL; uint8_t *src; ssize_t hmem_copy_ret; + struct smr_region *peer_smr; int ret = FI_SUCCESS; switch (cmd->hdr.proto) { @@ -132,33 +132,33 @@ static int smr_progress_return_entry(struct smr_ep *ep, struct smr_cmd *cmd, smr_try_send_cmd(ep, cmd); return -FI_EAGAIN; case smr_proto_inject: - tx_buf = smr_get_inject_buf(ep->region, cmd); - if (pend) { - if (pend->bytes_done != cmd->hdr.size && - cmd->hdr.op != ofi_op_atomic) { - src = cmd->hdr.op == ofi_op_atomic_compare ? - tx_buf->buf : tx_buf->data; - hmem_copy_ret = ofi_copy_to_mr_iov( - pend->mr, pend->iov, - pend->iov_count, - 0, src, cmd->hdr.size); - - if (hmem_copy_ret < 0) { - FI_WARN(&smr_prov, FI_LOG_EP_CTRL, - "RMA read/fetch failed " - "with code %d\n", - (int)(-hmem_copy_ret)); - ret = hmem_copy_ret; - } else if (hmem_copy_ret != cmd->hdr.size) { - FI_WARN(&smr_prov, FI_LOG_EP_CTRL, - "Incomplete rma read/fetch " - "buffer copied\n"); - ret = -FI_ETRUNC; - } else { - pend->bytes_done = - (size_t) hmem_copy_ret; - } + assert(pend); + if (pend->bytes_done != cmd->hdr.size && + cmd->hdr.op != ofi_op_atomic) { + peer_smr = smr_peer_region(ep, cmd->hdr.tx_id); + tx_buf = smr_freestack_get_entry_from_index( + smr_inject_pool(peer_smr), + cmd->data.inject_buf_index); + src = cmd->hdr.op == ofi_op_atomic_compare ? + tx_buf->buf : tx_buf->data; + hmem_copy_ret = ofi_copy_to_mr_iov( + pend->mr, pend->iov, + pend->iov_count, + 0, src, cmd->hdr.size); + + if (hmem_copy_ret < 0) { + FI_WARN(&smr_prov, FI_LOG_EP_CTRL, + "RMA read/fetch failed " + "with code %d\n", + (int)(-hmem_copy_ret)); + ret = hmem_copy_ret; + } else if (hmem_copy_ret != cmd->hdr.size) { + FI_WARN(&smr_prov, FI_LOG_EP_CTRL, + "Incomplete rma read/fetch " + "buffer copied\n"); + ret = -FI_ETRUNC; } + smr_return_inject_buf(peer_smr, tx_buf); } break; default: @@ -188,27 +188,26 @@ static void smr_progress_return(struct smr_ep *ep) ret = smr_progress_return_entry(ep, cmd, pending); if (ret != -FI_EAGAIN) { - if (pending) { - if (cmd->hdr.status) { - ret = smr_write_err_comp( - ep->util_ep.tx_cq, - pending->comp_ctx, - pending->comp_flags, - cmd->hdr.tag, - cmd->hdr.status); - } else { - ret = smr_complete_tx( - ep, pending->comp_ctx, - cmd->hdr.op, - pending->comp_flags); - } - if (ret) { - FI_WARN(&smr_prov, FI_LOG_EP_CTRL, - "unable to process " - "tx completion\n"); - } - ofi_buf_free(pending); + assert(pending); + if (cmd->hdr.status) { + ret = smr_write_err_comp( + ep->util_ep.tx_cq, + pending->comp_ctx, + pending->comp_flags, + cmd->hdr.tag, + cmd->hdr.status); + } else { + ret = smr_complete_tx( + ep, pending->comp_ctx, + cmd->hdr.op, + pending->comp_flags); + } + if (ret) { + FI_WARN(&smr_prov, FI_LOG_EP_CTRL, + "unable to process " + "tx completion\n"); } + ofi_buf_free(pending); smr_freestack_push(smr_cmd_stack(ep->region), cmd); } smr_return_queue_release(smr_return_queue(ep->region), @@ -242,19 +241,18 @@ static ssize_t smr_progress_inject(struct smr_ep *ep, struct smr_cmd *cmd, struct ofi_mr **mr, struct iovec *iov, size_t iov_count) { - struct smr_region *peer_smr; struct smr_inject_buf *tx_buf; ssize_t ret; - peer_smr = smr_peer_region(ep, cmd->hdr.rx_id); - tx_buf = smr_get_inject_buf(peer_smr, cmd); - - if (cmd->hdr.op == ofi_op_read_req) { - ret = ofi_copy_from_mr_iov(tx_buf->data, cmd->hdr.size, mr, - iov, iov_count, 0); - } else { + tx_buf = smr_freestack_get_entry_from_index(smr_inject_pool(ep->region), + cmd->data.inject_buf_index); + if (cmd->hdr.op != ofi_op_read_req) { ret = ofi_copy_to_mr_iov(mr, iov, iov_count, 0, tx_buf->data, cmd->hdr.size); + smr_return_inject_buf(ep->region, tx_buf); + } else { + ret = ofi_copy_from_mr_iov(tx_buf->data, cmd->hdr.size, mr, + iov, iov_count, 0); } if (ret < 0) { @@ -262,7 +260,8 @@ static ssize_t smr_progress_inject(struct smr_ep *ep, struct smr_cmd *cmd, "inject recv failed with code %lu\n", ret); } else if (ret != cmd->hdr.size) { FI_WARN(&smr_prov, FI_LOG_EP_CTRL, - "inject recv truncated\n"); + "inject recv truncated. Expected size %zu copied size " + "%zu\n", cmd->hdr.size, ret); ret = -FI_ETRUNC; } else { ret = FI_SUCCESS; @@ -419,10 +418,10 @@ static void smr_init_rx_pend(struct smr_pend_entry *pend, struct smr_cmd *cmd, if (rx_entry) { pend->comp_ctx = rx_entry->context; pend->comp_flags = smr_rx_cq_flags(rx_entry->flags, - cmd->hdr.op_flags); + cmd->hdr.smr_flags); } else { pend->comp_ctx = NULL; - pend->comp_flags = smr_rx_cq_flags(0, cmd->hdr.op_flags); + pend->comp_flags = smr_rx_cq_flags(0, cmd->hdr.smr_flags); } pend->cmd = cmd; @@ -591,7 +590,7 @@ static smr_progress_func smr_progress_ops[smr_proto_max] = { static void smr_do_atomic(struct smr_cmd *cmd, void *src, struct ofi_mr *dst_mr, void *dst, void *cmp, enum fi_datatype datatype, - enum fi_op op, size_t cnt, uint16_t flags) + enum fi_op op, size_t cnt) { char tmp_result[SMR_INJECT_SIZE]; char tmp_dst[SMR_INJECT_SIZE]; @@ -647,7 +646,7 @@ static int smr_progress_inline_atomic(struct smr_cmd *cmd, struct ofi_mr **mr, for (i = *len = 0; i < ioc_count && *len < cmd->hdr.size; i++) { smr_do_atomic(cmd, &src[*len], mr[i], ioc[i].addr, NULL, cmd->hdr.datatype, cmd->hdr.atomic_op, - ioc[i].count, cmd->hdr.op_flags); + ioc[i].count); *len += ioc[i].count * ofi_datatype_size(cmd->hdr.datatype); } @@ -667,7 +666,8 @@ static int smr_progress_inject_atomic(struct smr_cmd *cmd, struct ofi_mr **mr, uint8_t *src, *comp; int i; - tx_buf = smr_get_inject_buf(smr_peer_region(ep, cmd->hdr.rx_id), cmd); + tx_buf = smr_freestack_get_entry_from_index(smr_inject_pool(ep->region), + cmd->data.inject_buf_index); switch (cmd->hdr.op) { case ofi_op_atomic_compare: @@ -683,11 +683,11 @@ static int smr_progress_inject_atomic(struct smr_cmd *cmd, struct ofi_mr **mr, for (i = *len = 0; i < ioc_count && *len < cmd->hdr.size; i++) { smr_do_atomic(cmd, &src[*len], mr[i], ioc[i].addr, comp ? &comp[*len] : NULL, cmd->hdr.datatype, - cmd->hdr.atomic_op, ioc[i].count, - cmd->hdr.op_flags); + cmd->hdr.atomic_op, ioc[i].count); *len += ioc[i].count * ofi_datatype_size(cmd->hdr.datatype); } + smr_return_inject_buf(ep->region, tx_buf); if (*len != cmd->hdr.size) { FI_WARN(&smr_prov, FI_LOG_EP_CTRL, "recv truncated"); return -FI_ETRUNC; @@ -703,7 +703,7 @@ static int smr_start_common(struct smr_ep *ep, struct smr_cmd *cmd, uint64_t comp_flags; void *comp_buf; int ret; - bool return_cmd = cmd->hdr.proto != smr_proto_inline; + bool return_cmd = cmd->hdr.smr_flags & SMR_RETURN_CMD; rx_entry->peer_context = NULL; assert (cmd->hdr.proto < smr_proto_max); @@ -711,11 +711,10 @@ static int smr_start_common(struct smr_ep *ep, struct smr_cmd *cmd, ep, cmd, rx_entry, (struct ofi_mr **) rx_entry->desc, rx_entry->iov, rx_entry->count); - if (!cmd->hdr.rx_ctx) { comp_buf = rx_entry->iov[0].iov_base; comp_flags = smr_rx_cq_flags(rx_entry->flags, - cmd->hdr.op_flags); + cmd->hdr.smr_flags); if (ret) { FI_WARN(&smr_prov, FI_LOG_EP_CTRL, "error processing op\n"); @@ -756,7 +755,7 @@ static int smr_copy_saved(struct smr_cmd_ctx *cmd_ctx, int ret; comp_flags = smr_rx_cq_flags(rx_entry->flags, - cmd_ctx->cmd->hdr.op_flags); + cmd_ctx->cmd->hdr.smr_flags); while (!slist_empty(&cmd_ctx->buf_list)) { slist_remove_head_container(&cmd_ctx->buf_list, struct smr_unexp_buf, buf, entry); @@ -813,7 +812,7 @@ int smr_unexp_start(struct fi_peer_rx_entry *rx_entry) struct smr_cmd_ctx *cmd_ctx = rx_entry->peer_context; int ret = FI_SUCCESS; - if (cmd_ctx->cmd->hdr.op_flags & SMR_BUFFER_RECV) + if (cmd_ctx->cmd->hdr.smr_flags & SMR_BUFFER_RECV) ret = smr_copy_saved(cmd_ctx, rx_entry); else ret = smr_start_common(cmd_ctx->ep, cmd_ctx->cmd, rx_entry); @@ -883,34 +882,33 @@ static int smr_unexp_inline(struct smr_ep *ep, struct smr_cmd_ctx *cmd_ctx, static int smr_unexp_inject(struct smr_ep *ep, struct smr_cmd_ctx *cmd_ctx, struct smr_cmd *cmd) { - struct smr_region *peer_smr; - struct smr_unexp_buf *buf; struct smr_inject_buf *tx_buf; - int ret = FI_SUCCESS; - - if (!(cmd->hdr.op_flags & SMR_BUFFER_RECV)) { - cmd_ctx->cmd = cmd; - return FI_SUCCESS; - } + struct smr_unexp_buf *buf; - peer_smr = smr_peer_region(ep, cmd_ctx->cmd->hdr.rx_id); + cmd->hdr.smr_flags |= SMR_BUFFER_RECV; memcpy(&cmd_ctx->cmd_cpy, cmd, sizeof(cmd_ctx->cmd->hdr)); cmd_ctx->cmd = &cmd_ctx->cmd_cpy; + tx_buf = smr_freestack_get_entry_from_index(smr_inject_pool(ep->region), + cmd->data.inject_buf_index); buf = ofi_buf_alloc(ep->unexp_buf_pool); if (!buf) { - ret = -FI_ENOMEM; - cmd->hdr.status = ret; - goto out; + FI_WARN(&smr_prov, FI_LOG_EP_CTRL, + "Error allocating buffer\n"); + ofi_buf_free(cmd_ctx); + return -FI_ENOMEM; } - tx_buf = smr_get_inject_buf(peer_smr, cmd); - memcpy(buf->buf, tx_buf->buf, cmd_ctx->cmd->hdr.size); + memcpy(buf->buf, tx_buf->data, cmd->hdr.size); + smr_return_inject_buf(ep->region, tx_buf); + slist_init(&cmd_ctx->buf_list); slist_insert_tail(&buf->entry, &cmd_ctx->buf_list); -out: - smr_return_cmd(ep, cmd); - return ret; + + if (cmd->hdr.smr_flags & SMR_RETURN_CMD) + smr_return_cmd(ep, cmd); + + return FI_SUCCESS; } static int smr_unexp_iov(struct smr_ep *ep, struct smr_cmd_ctx *cmd_ctx, @@ -921,7 +919,7 @@ static int smr_unexp_iov(struct smr_ep *ep, struct smr_cmd_ctx *cmd_ctx, struct iovec iov; int ret = FI_SUCCESS; - if (!(cmd->hdr.op_flags & SMR_BUFFER_RECV)) { + if (!(cmd->hdr.smr_flags & SMR_BUFFER_RECV)) { cmd_ctx->cmd = cmd; return FI_SUCCESS; } @@ -966,7 +964,7 @@ static int smr_unexp_sar(struct smr_ep *ep, struct smr_cmd_ctx *cmd_ctx, struct smr_pend_entry *sar_entry; int ret; - cmd->hdr.op_flags |= SMR_BUFFER_RECV; + cmd->hdr.smr_flags |= SMR_BUFFER_RECV; cmd_ctx->cmd = &cmd_ctx->cmd_cpy; memcpy(&cmd_ctx->cmd_cpy, cmd, @@ -995,6 +993,7 @@ static int smr_unexp_sar(struct smr_ep *ep, struct smr_cmd_ctx *cmd_ctx, cmd->hdr.status = ret; } out: + assert(cmd->hdr.smr_flags & SMR_RETURN_CMD); smr_return_cmd(ep, cmd); return FI_SUCCESS; } @@ -1008,7 +1007,7 @@ static int smr_unexp_ipc(struct smr_ep *ep, struct smr_cmd_ctx *cmd_ctx, struct iovec iov; int ret = FI_SUCCESS;; - if (!(cmd->hdr.op_flags & SMR_BUFFER_RECV)) { + if (!(cmd->hdr.smr_flags & SMR_BUFFER_RECV)) { cmd_ctx->cmd = cmd; return FI_SUCCESS; } @@ -1086,7 +1085,7 @@ static int smr_alloc_cmd_ctx(struct smr_ep *ep, slist_init(&cmd_ctx->buf_list); rx_entry->msg_size = cmd->hdr.size; - if (cmd->hdr.op_flags & SMR_REMOTE_CQ_DATA) { + if (cmd->hdr.smr_flags & SMR_REMOTE_CQ_DATA) { rx_entry->flags |= FI_REMOTE_CQ_DATA; rx_entry->cq_data = cmd->hdr.cq_data; } @@ -1099,7 +1098,6 @@ static int smr_alloc_cmd_ctx(struct smr_ep *ep, return ret; } - dlist_insert_tail(&cmd_ctx->entry, &ep->unexp_cmd_list); return FI_SUCCESS; } @@ -1167,7 +1165,7 @@ static int smr_progress_cmd_rma(struct smr_ep *ep, struct smr_cmd *cmd) int ret = 0; struct ofi_mr *mr[SMR_IOV_LIMIT]; struct smr_pend_entry *pend; - bool return_cmd = cmd->hdr.proto != smr_proto_inline; + bool return_cmd = cmd->hdr.smr_flags & SMR_RETURN_CMD; if (cmd->hdr.rx_ctx) return smr_progress_pending(ep, cmd); @@ -1211,11 +1209,11 @@ static int smr_progress_cmd_rma(struct smr_ep *ep, struct smr_cmd *cmd) FI_WARN(&smr_prov, FI_LOG_EP_CTRL, "error processing rma op\n"); ret = smr_write_err_comp(ep->util_ep.rx_cq, NULL, - smr_rx_cq_flags(0, cmd->hdr.op_flags), + smr_rx_cq_flags(0, cmd->hdr.smr_flags), 0, ret); } else { ret = smr_complete_rx(ep, NULL, cmd->hdr.op, - smr_rx_cq_flags(0, cmd->hdr.op_flags), + smr_rx_cq_flags(0, cmd->hdr.smr_flags), cmd->hdr.size, iov_count ? iov[0].iov_base : NULL, cmd->hdr.rx_id, 0, cmd->hdr.cq_data); @@ -1286,11 +1284,11 @@ static int smr_progress_cmd_atomic(struct smr_ep *ep, struct smr_cmd *cmd) "error processing atomic op\n"); ret = smr_write_err_comp(ep->util_ep.rx_cq, NULL, smr_rx_cq_flags(0, - cmd->hdr.op_flags), 0, err); + cmd->hdr.smr_flags), 0, err); } else { ret = smr_complete_rx(ep, NULL, cmd->hdr.op, smr_rx_cq_flags(0, - cmd->hdr.op_flags), total_len, + cmd->hdr.smr_flags), total_len, ioc_count ? ioc[0].addr : NULL, cmd->hdr.rx_id, 0, cmd->hdr.cq_data); } @@ -1301,7 +1299,7 @@ static int smr_progress_cmd_atomic(struct smr_ep *ep, struct smr_cmd *cmd) } out: - if (cmd->hdr.proto != smr_proto_inline) + if (cmd->hdr.smr_flags & SMR_RETURN_CMD) smr_return_cmd(ep, cmd); return err; } diff --git a/prov/shm/src/smr_rma.c b/prov/shm/src/smr_rma.c index 5c191e7d36c..13384c2ca1b 100644 --- a/prov/shm/src/smr_rma.c +++ b/prov/shm/src/smr_rma.c @@ -128,12 +128,13 @@ static ssize_t smr_generic_rma( { struct smr_region *peer_smr; int64_t tx_id, rx_id; - int proto = smr_proto_inline; + int proto; ssize_t ret = -FI_EAGAIN; size_t total_len; struct smr_cmd_entry *ce; struct smr_cmd *cmd; int64_t pos; + uint8_t smr_flags; assert(iov_count <= SMR_IOV_LIMIT); assert(rma_count <= SMR_IOV_LIMIT); @@ -169,8 +170,8 @@ static ssize_t smr_generic_rma( proto = smr_select_proto(desc, iov_count, smr_vma_enabled(ep, peer_smr), smr_ipc_valid(ep, peer_smr, tx_id, rx_id), op, - total_len, op_flags); - if (proto != smr_proto_inline) { + total_len, op_flags, &smr_flags); + if (smr_flags & SMR_RETURN_CMD) { if (smr_freestack_isempty(smr_cmd_stack(ep->region))) { smr_cmd_queue_discard(ce, pos); ret = -FI_EAGAIN; @@ -184,10 +185,10 @@ static ssize_t smr_generic_rma( cmd = &ce->cmd; } ret = smr_send_ops[proto](ep, peer_smr, tx_id, rx_id, op, 0, data, - op_flags, (struct ofi_mr **)desc, iov, - iov_count, total_len, context, cmd); + op_flags, smr_flags, (struct ofi_mr **)desc, + iov, iov_count, total_len, context, cmd); if (ret) { - if (proto != smr_proto_inline) + if (smr_flags & SMR_RETURN_CMD) smr_freestack_push(smr_cmd_stack(ep->region), cmd); smr_cmd_queue_discard(ce, pos); goto unlock; @@ -196,7 +197,7 @@ static ssize_t smr_generic_rma( smr_add_rma_cmd(peer_smr, rma_iov, rma_count, cmd); smr_cmd_queue_commit(ce, pos); - if (proto != smr_proto_inline || op == ofi_op_read_req) + if (smr_flags & SMR_RETURN_CMD) goto unlock; ret = smr_complete_tx(ep, context, op, op_flags); @@ -320,7 +321,7 @@ static ssize_t smr_writemsg(struct fid_ep *ep_fid, const struct fi_msg_rma *msg, static ssize_t smr_generic_rma_inject( struct fid_ep *ep_fid, const void *buf, size_t len, fi_addr_t dest_addr, uint64_t addr, uint64_t key, uint64_t data, - uint64_t flags) + uint64_t op_flags) { struct smr_ep *ep; struct smr_region *peer_smr; @@ -332,6 +333,7 @@ static ssize_t smr_generic_rma_inject( struct smr_cmd *cmd; struct smr_cmd_entry *ce; int64_t pos; + uint8_t smr_flags; assert(len <= SMR_INJECT_SIZE); ep = container_of(ep_fid, struct smr_ep, util_ep.ep_fid.fid); @@ -359,36 +361,21 @@ static ssize_t smr_generic_rma_inject( goto unlock; } - if (len <= SMR_MSG_DATA_LEN) { - proto = smr_proto_inline; - cmd = &ce->cmd; - } else { - proto = smr_proto_inject; - if (smr_freestack_isempty(smr_cmd_stack(ep->region))) { - smr_cmd_queue_discard(ce, pos); - ret = -FI_EAGAIN; - goto unlock; - } - - cmd = smr_freestack_pop(smr_cmd_stack(ep->region)); - assert(cmd); - ce->ptr = smr_local_to_peer(ep, peer_smr, tx_id, rx_id, - (uintptr_t) cmd); - } + proto = len <= SMR_MSG_DATA_LEN ? smr_proto_inline : smr_proto_inject; + cmd = &ce->cmd; + smr_flags = (op_flags & FI_REMOTE_CQ_DATA) ? SMR_REMOTE_CQ_DATA : 0; ret = smr_send_ops[proto](ep, peer_smr, tx_id, rx_id, ofi_op_write, 0, - data, flags, NULL, &iov, 1, len, NULL, cmd); + data, op_flags, smr_flags, NULL, &iov, 1, len, + NULL, cmd); if (ret) { - if (proto != smr_proto_inline) - smr_freestack_push(smr_cmd_stack(ep->region), cmd); smr_cmd_queue_discard(ce, pos); goto unlock; } smr_add_rma_cmd(peer_smr, &rma_iov, 1, cmd); smr_cmd_queue_commit(ce, pos); - if (proto == smr_proto_inline) - ofi_ep_peer_tx_cntr_inc(&ep->util_ep, ofi_op_write); + ofi_ep_peer_tx_cntr_inc(&ep->util_ep, ofi_op_write); unlock: ofi_genlock_unlock(&ep->util_ep.lock); return ret; diff --git a/prov/shm/src/smr_util.c b/prov/shm/src/smr_util.c index 71340bdf1a8..d8f5e6dd4c5 100644 --- a/prov/shm/src/smr_util.c +++ b/prov/shm/src/smr_util.c @@ -37,6 +37,7 @@ struct dlist_entry ep_name_list; DEFINE_LIST(ep_name_list); pthread_mutex_t ep_list_lock = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t inject_pool_lock = PTHREAD_MUTEX_INITIALIZER; void smr_cleanup(void) { @@ -106,8 +107,9 @@ size_t smr_calculate_size_offsets(size_t tx_count, size_t rx_count, sizeof(struct smr_cmd_queue_entry) * rx_size; inject_pool_offset = cmd_stack_offset + freestack_size(sizeof(struct smr_cmd), tx_size); - ret_queue_offset = inject_pool_offset + sizeof(struct smr_inject_buf) * - tx_size; + ret_queue_offset = inject_pool_offset + + freestack_size(sizeof(struct smr_inject_buf), + rx_size); ret_queue_offset = ofi_get_aligned_size(ret_queue_offset, 64); sar_pool_offset = ret_queue_offset + sizeof(struct smr_return_queue) + sizeof(struct smr_return_queue_entry) * tx_size; @@ -287,6 +289,8 @@ int smr_create(const struct fi_provider *prov, const struct smr_attr *attr, (*smr)->max_sar_buf_per_peer = SMR_BUF_BATCH_MAX; smr_cmd_queue_init(smr_cmd_queue(*smr), rx_size, smr_cmd_init); + smr_freestack_init(smr_inject_pool(*smr), rx_size, + sizeof(struct smr_inject_buf)); smr_return_queue_init(smr_return_queue(*smr), tx_size, NULL); smr_freestack_init(smr_cmd_stack(*smr), tx_size, @@ -300,6 +304,7 @@ int smr_create(const struct fi_provider *prov, const struct smr_attr *attr, smr_peer_data(*smr)[i].xpmem.avail = false; } + ofi_spin_init(&(*smr)->fs_lock); strncpy((char *) smr_name(*smr), attr->name, total_size - name_offset); /* Must be set last to signal full initialization to peers */ @@ -318,6 +323,10 @@ int smr_create(const struct fi_provider *prov, const struct smr_attr *attr, void smr_free(struct smr_region *smr) { + /* + * TODO: Figure out if we should cleanup resources in the smr before + * unlinking and unmapping. + */ if (smr->flags & SMR_FLAG_HMEM_ENABLED) (void) ofi_hmem_host_unregister(smr); shm_unlink(smr_name(smr)); diff --git a/prov/shm/src/smr_util.h b/prov/shm/src/smr_util.h index 12d6a0af871..17b33328af4 100644 --- a/prov/shm/src/smr_util.h +++ b/prov/shm/src/smr_util.h @@ -35,8 +35,8 @@ #include "ofi.h" #include "ofi_atomic_queue.h" +#include "ofi_lock.h" #include "ofi_xpmem.h" -#include #ifdef __cplusplus extern "C" { @@ -74,6 +74,7 @@ extern struct smr_env smr_env; #define SMR_REMOTE_CQ_DATA (1 << 0) #define SMR_BUFFER_RECV (1 << 1) +#define SMR_RETURN_CMD (1 << 2) enum { smr_proto_inline, /* inline payload */ @@ -119,7 +120,7 @@ struct smr_cmd_hdr { int16_t tx_id; uint8_t op; uint8_t proto; - uint8_t op_flags; + uint8_t smr_flags; uint8_t resv[1]; }; @@ -145,6 +146,7 @@ struct smr_cmd_rma { struct smr_cmd_data { union { uint8_t msg[SMR_MSG_DATA_LEN]; + uint64_t inject_buf_index; struct { size_t iov_count; struct iovec iov[SMR_IOV_LIMIT]; @@ -244,6 +246,8 @@ struct smr_region { uintptr_t base_addr; size_t total_size; + + ofi_spin_t fs_lock; }; uint8_t pad[SMR_PREFETCH_SZ]; }; @@ -314,9 +318,9 @@ static inline struct smr_freestack *smr_cmd_stack(struct smr_region *smr) { return (struct smr_freestack *) ((char *) smr + smr->cmd_stack_offset); } -static inline struct smr_inject_buf *smr_inject_pool(struct smr_region *smr) +static inline struct smr_freestack *smr_inject_pool(struct smr_region *smr) { - return (struct smr_inject_buf *) + return (struct smr_freestack *) ((char *) smr + smr->inject_pool_offset); } static inline struct smr_return_queue *smr_return_queue(struct smr_region *smr) @@ -337,11 +341,32 @@ static inline const char *smr_name(struct smr_region *smr) return (const char *) smr + smr->name_offset; } -static inline struct smr_inject_buf *smr_get_inject_buf(struct smr_region *smr, - struct smr_cmd *cmd) +static inline struct smr_inject_buf *smr_get_inject_buf(struct smr_region *smr) +{ + struct smr_inject_buf *buf; + ofi_spin_lock(&smr->fs_lock); + if (!smr_freestack_isempty(smr_inject_pool(smr))) + buf = smr_freestack_pop(smr_inject_pool(smr)); + else + buf = NULL; + ofi_spin_unlock(&smr->fs_lock); + return buf; +} + +static inline void smr_return_inject_buf(struct smr_region *smr, + struct smr_inject_buf *buf) +{ + ofi_spin_lock(&smr->fs_lock); + smr_freestack_push(smr_inject_pool(smr), buf); + ofi_spin_unlock(&smr->fs_lock); +} + +static inline void smr_return_sar_buf_by_index(struct smr_region *smr, + size_t index) { - return &smr_inject_pool(smr)[smr_freestack_get_index(smr_cmd_stack(smr), - (char *) cmd)]; + ofi_spin_lock(&smr->fs_lock); + smr_freestack_push_by_index(smr_sar_pool(smr), index); + ofi_spin_unlock(&smr->fs_lock); } struct smr_attr {