diff --git a/prov/shm/src/smr.h b/prov/shm/src/smr.h index cc05852b43b..95d4f240fea 100644 --- a/prov/shm/src/smr.h +++ b/prov/shm/src/smr.h @@ -256,20 +256,21 @@ void smr_format_tx_pend(struct smr_pend_entry *pend, struct smr_cmd *cmd, uint64_t op_flags); void smr_generic_format(struct smr_cmd *cmd, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t tag, uint64_t data, - uint64_t op_flags); + uint8_t smr_flags, uintptr_t tx_ctx); size_t smr_copy_to_sar(struct smr_ep *ep, struct smr_region *smr, struct smr_pend_entry *pend); size_t smr_copy_from_sar(struct smr_ep *ep, struct smr_region *smr, struct smr_pend_entry *pend); int smr_select_proto(void **desc, size_t iov_count, bool cma_avail, bool ipc_valid, uint32_t op, uint64_t total_len, - uint64_t op_flags); + uint64_t op_flags, uint8_t *smr_flags); typedef ssize_t (*smr_send_func)( struct smr_ep *ep, struct smr_region *peer_smr, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t tag, - uint64_t data, uint64_t op_flags, struct ofi_mr **desc, - const struct iovec *iov, size_t iov_count, size_t total_len, - void *context, struct smr_cmd *cmd); + uint64_t data, uint64_t op_flags, uint8_t smr_flags, + struct ofi_mr **desc, const struct iovec *iov, + size_t iov_count, size_t total_len, void *context, + struct smr_cmd *cmd); extern smr_send_func smr_send_ops[smr_proto_max]; int smr_write_err_comp(struct util_cq *cq, void *context, @@ -280,9 +281,9 @@ int smr_complete_rx(struct smr_ep *ep, void *context, uint32_t op, uint64_t flags, size_t len, void *buf, int64_t id, uint64_t tag, uint64_t data); -static inline uint64_t smr_rx_cq_flags(uint64_t rx_flags, uint16_t op_flags) +static inline uint64_t smr_rx_cq_flags(uint64_t rx_flags, uint8_t smr_flags) { - if (op_flags & SMR_REMOTE_CQ_DATA) + if (smr_flags & SMR_REMOTE_CQ_DATA) rx_flags |= FI_REMOTE_CQ_DATA; return rx_flags; } diff --git a/prov/shm/src/smr_atomic.c b/prov/shm/src/smr_atomic.c index acd062397a3..36e6bd15e93 100644 --- a/prov/shm/src/smr_atomic.c +++ b/prov/shm/src/smr_atomic.c @@ -64,12 +64,12 @@ static void smr_do_atomic_inline( struct ofi_mr **desc, const struct iovec *iov, size_t iov_count, size_t total_len, struct smr_cmd *cmd) { - smr_generic_format(cmd, tx_id, rx_id, op, 0, 0, op_flags); + smr_generic_format(cmd, tx_id, rx_id, op, 0, 0, op_flags, 0); smr_generic_atomic_format(cmd, datatype, atomic_op); smr_format_inline_atomic(cmd, desc, iov, iov_count); } -static void smr_format_inject_atomic( +static int smr_format_inject_atomic( struct smr_cmd *cmd, struct ofi_mr **desc, const struct iovec *iov, size_t count, const struct iovec *resultv, size_t result_count, @@ -80,8 +80,17 @@ static void smr_format_inject_atomic( size_t comp_size; cmd->hdr.proto = smr_proto_inject; + tx_buf = smr_get_inject_buf(smr); + if (!tx_buf) { + FI_DBG(&smr_prov, FI_LOG_EP_DATA, + "No inject buffers available, cannot send inject " + "message\n"); + return -FI_EAGAIN; + } - tx_buf = smr_get_inject_buf(smr, cmd); + cmd->data.inject_buf_index = smr_freestack_get_index( + smr_inject_pool(smr), + (char *)tx_buf); switch (cmd->hdr.op) { case ofi_op_atomic: cmd->hdr.size = ofi_copy_from_mr_iov( @@ -112,9 +121,11 @@ static void smr_format_inject_atomic( assert(0); break; } + + return FI_SUCCESS; } -static ssize_t smr_do_atomic_inject( +static int smr_do_atomic_inject( struct smr_ep *ep, struct smr_region *peer_smr, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t op_flags, uint8_t datatype, uint8_t atomic_op, @@ -123,38 +134,44 @@ static ssize_t smr_do_atomic_inject( const struct iovec *resultv, size_t result_count, struct ofi_mr **comp_desc, const struct iovec *compv, size_t comp_count, size_t total_len, void *context, - uint16_t smr_flags, struct smr_cmd *cmd) + uint8_t smr_flags, struct smr_cmd *cmd) { - struct smr_pend_entry *pend; + struct smr_pend_entry *pend = NULL; + int ret; - smr_generic_format(cmd, tx_id, rx_id, op, 0, 0, op_flags); - smr_generic_atomic_format(cmd, datatype, atomic_op); - smr_format_inject_atomic(cmd, desc, iov, iov_count, resultv, - result_count, comp_desc, compv, comp_count, - ep->region); - if (op == ofi_op_atomic_fetch || op == ofi_op_atomic_compare || - atomic_op == FI_ATOMIC_READ || op_flags & FI_DELIVERY_COMPLETE) { + if (smr_flags & SMR_RETURN_CMD) { pend = ofi_buf_alloc(ep->pend_pool); assert(pend); - cmd->hdr.tx_ctx = (uintptr_t) pend; smr_format_tx_pend(pend, cmd, context, res_desc, resultv, result_count, op_flags); - } else { - cmd->hdr.tx_ctx = 0; } + smr_generic_format(cmd, tx_id, rx_id, op, 0, 0, smr_flags, + (uintptr_t) pend); + smr_generic_atomic_format(cmd, datatype, atomic_op); + ret = smr_format_inject_atomic(cmd, desc, iov, iov_count, resultv, + result_count, comp_desc, compv, + comp_count, peer_smr); + if (ret) + return ret; + return FI_SUCCESS; } static int smr_select_atomic_proto(uint32_t op, uint64_t total_len, - uint64_t op_flags) + uint64_t op_flags, uint8_t *smr_flags) { + *smr_flags = 0; + assert(!(op_flags & FI_REMOTE_CQ_DATA)); if (op == ofi_op_atomic_compare || op == ofi_op_atomic_fetch || - op_flags & FI_DELIVERY_COMPLETE || total_len > SMR_MSG_DATA_LEN) + op_flags & FI_DELIVERY_COMPLETE) { + *smr_flags |= SMR_RETURN_CMD; return smr_proto_inject; + } - return smr_proto_inline; + return total_len > SMR_MSG_DATA_LEN ? smr_proto_inject : + smr_proto_inline; } static ssize_t smr_generic_atomic( @@ -174,12 +191,12 @@ static ssize_t smr_generic_atomic( struct iovec iov[SMR_IOV_LIMIT]; struct iovec compare_iov[SMR_IOV_LIMIT]; struct iovec result_iov[SMR_IOV_LIMIT]; - uint16_t smr_flags = 0; int64_t tx_id, rx_id, pos; int proto; ssize_t ret; size_t total_len; uint64_t atomic_flags; + uint8_t smr_flags; assert(count <= SMR_IOV_LIMIT); assert(result_count <= SMR_IOV_LIMIT); @@ -248,15 +265,8 @@ static ssize_t smr_generic_atomic( break; } - proto = smr_select_atomic_proto(op, total_len, op_flags); - - if (proto == smr_proto_inline) { - cmd = &ce->cmd; - smr_do_atomic_inline(ep, peer_smr, tx_id, rx_id, ofi_op_atomic, - op_flags, datatype, atomic_op, - (struct ofi_mr **) desc, iov, count, - total_len, cmd); - } else { + proto = smr_select_atomic_proto(op, total_len, op_flags, &smr_flags); + if (smr_flags & SMR_RETURN_CMD) { if (smr_freestack_isempty(smr_cmd_stack(ep->region))) { smr_cmd_queue_discard(ce, pos); ret = -FI_EAGAIN; @@ -266,7 +276,17 @@ static ssize_t smr_generic_atomic( cmd = smr_freestack_pop(smr_cmd_stack(ep->region)); assert(cmd); ce->ptr = smr_local_to_peer(ep, peer_smr, tx_id, rx_id, - (uintptr_t) cmd); + (uintptr_t) cmd); + } else { + cmd = &ce->cmd; + } + + if (proto == smr_proto_inline) { + smr_do_atomic_inline(ep, peer_smr, tx_id, rx_id, ofi_op_atomic, + op_flags, datatype, atomic_op, + (struct ofi_mr **) desc, iov, count, + total_len, cmd); + } else { ret = smr_do_atomic_inject(ep, peer_smr, tx_id, rx_id, op, op_flags, datatype, atomic_op, (struct ofi_mr **) desc, iov, count, @@ -281,16 +301,17 @@ static ssize_t smr_generic_atomic( } } - if (!cmd->hdr.tx_ctx) { - ret = smr_complete_tx(ep, context, op, op_flags); - if (ret) { - FI_WARN(&smr_prov, FI_LOG_EP_CTRL, - "unable to process tx completion\n"); - } - } - smr_format_rma_ioc(cmd, rma_ioc, rma_count); smr_cmd_queue_commit(ce, pos); + + if (smr_flags & SMR_RETURN_CMD) + goto unlock; + + ret = smr_complete_tx(ep, context, op, op_flags); + if (ret) + FI_WARN(&smr_prov, FI_LOG_EP_CTRL, + "unable to process tx completion\n"); + unlock: ofi_genlock_unlock(&ep->util_ep.lock); return ret; @@ -371,7 +392,6 @@ static ssize_t smr_atomic_inject( int64_t id, peer_id, pos; ssize_t ret; size_t total_len; - int proto; ep = container_of(ep_fid, struct smr_ep, util_ep.ep_fid.fid); @@ -412,24 +432,12 @@ static ssize_t smr_atomic_inject( rma_ioc.count = count; rma_ioc.key = key; + cmd = &ce->cmd; if (total_len <= SMR_MSG_DATA_LEN) { - proto = smr_proto_inline; - cmd = &ce->cmd; smr_do_atomic_inline(ep, peer_smr, id, peer_id, ofi_op_atomic, 0, datatype, op, NULL, &iov, 1, total_len, &ce->cmd); } else { - proto = smr_proto_inject; - if (smr_freestack_isempty(smr_cmd_stack(ep->region))) { - smr_cmd_queue_discard(ce, pos); - ret = -FI_EAGAIN; - goto unlock; - } - - cmd = smr_freestack_pop(smr_cmd_stack(ep->region)); - assert(cmd); - ce->ptr = smr_local_to_peer(ep, peer_smr, id, peer_id, - (uintptr_t) cmd); ret = smr_do_atomic_inject(ep, peer_smr, id, peer_id, ofi_op_atomic, 0, datatype, op, NULL, &iov, 1, NULL, NULL, 0, NULL, NULL, @@ -442,9 +450,7 @@ static ssize_t smr_atomic_inject( smr_format_rma_ioc(cmd, &rma_ioc, 1); smr_cmd_queue_commit(ce, pos); - - if (proto == smr_proto_inline) - ofi_ep_peer_tx_cntr_inc(&ep->util_ep, ofi_op_atomic); + ofi_ep_peer_tx_cntr_inc(&ep->util_ep, ofi_op_atomic); unlock: ofi_genlock_unlock(&ep->util_ep.lock); return ret; diff --git a/prov/shm/src/smr_ep.c b/prov/shm/src/smr_ep.c index f46e787a947..d8c8f2415d6 100644 --- a/prov/shm/src/smr_ep.c +++ b/prov/shm/src/smr_ep.c @@ -245,19 +245,20 @@ void smr_format_tx_pend(struct smr_pend_entry *pend, struct smr_cmd *cmd, void smr_generic_format(struct smr_cmd *cmd, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t tag, uint64_t data, - uint64_t op_flags) + uint8_t smr_flags, uintptr_t tx_ctx) { - cmd->hdr.op = op; - cmd->hdr.status = 0; - cmd->hdr.op_flags = 0; - cmd->hdr.tag = tag; - cmd->hdr.tx_id = tx_id; - cmd->hdr.rx_id = rx_id; - cmd->hdr.cq_data = data; - cmd->hdr.rx_ctx = 0; - - if (op_flags & FI_REMOTE_CQ_DATA) - cmd->hdr.op_flags |= SMR_REMOTE_CQ_DATA; + struct smr_cmd_hdr hdr; + + hdr.op = op; + hdr.smr_flags = smr_flags; + hdr.rx_id = rx_id; + hdr.tx_id = tx_id; + hdr.cq_data = data; + hdr.tag = tag; + hdr.rx_ctx = 0; + hdr.tx_ctx = tx_ctx; + + cmd->hdr = hdr; } static void smr_format_inline(struct smr_cmd *cmd, struct ofi_mr **mr, @@ -268,24 +269,33 @@ static void smr_format_inline(struct smr_cmd *cmd, struct ofi_mr **mr, mr, iov, count, 0); } -static void smr_format_inject(struct smr_ep *ep, struct smr_cmd *cmd, - struct smr_pend_entry *pend) +static int smr_format_inject(struct smr_ep *ep, struct smr_region *peer_smr, + struct smr_cmd *cmd, struct ofi_mr **mr, + const struct iovec *iov, uint32_t iov_count, + uint64_t op_flags) { struct smr_inject_buf *tx_buf; - tx_buf = smr_get_inject_buf(ep->region, cmd); - cmd->hdr.proto = smr_proto_inject; - if (cmd->hdr.op != ofi_op_read_req) { + tx_buf = smr_get_inject_buf(peer_smr); + if (!tx_buf) { + FI_DBG(&smr_prov, FI_LOG_EP_DATA, + "No inject buffers available, cannot send inject " + "message\n"); + return -FI_EAGAIN; + } + + cmd->data.inject_buf_index = smr_freestack_get_index( + smr_inject_pool(peer_smr), + (char *)tx_buf); + if (cmd->hdr.op != ofi_op_read_req) cmd->hdr.size = ofi_copy_from_mr_iov(tx_buf->data, SMR_INJECT_SIZE, - pend->mr, pend->iov, - pend->iov_count, 0); - pend->bytes_done = cmd->hdr.size; - } else { - cmd->hdr.size = ofi_total_iov_len(pend->iov, pend->iov_count); - pend->bytes_done = 0; - } + mr, iov, iov_count, 0); + else + cmd->hdr.size = ofi_total_iov_len(iov, iov_count); + + return FI_SUCCESS; } static void smr_format_iov(struct smr_cmd *cmd, struct smr_pend_entry *pend) @@ -379,9 +389,8 @@ static int smr_format_sar(struct smr_ep *ep, struct smr_cmd *cmd, ret = pend->sar_copy_fn(ep, pend); if (ret < 0 && ret != -FI_EBUSY) { for (i = cmd->data.buf_batch_size - 1; i >= 0; i--) { - smr_freestack_push_by_index( - smr_sar_pool(ep->region), - cmd->data.sar[i]); + smr_return_sar_buf_by_index(ep->region, + cmd->data.sar[i]); } return -FI_EAGAIN; } @@ -398,12 +407,15 @@ static int smr_format_sar(struct smr_ep *ep, struct smr_cmd *cmd, int smr_select_proto(void **desc, size_t iov_count, bool vma_avail, bool ipc_valid, uint32_t op, uint64_t total_len, - uint64_t op_flags) + uint64_t op_flags, uint8_t *smr_flags) { struct ofi_mr *smr_desc; enum fi_hmem_iface iface = FI_HMEM_SYSTEM; bool fastcopy_avail = false, use_ipc = false; + *smr_flags = 0; + *smr_flags |= (op_flags & FI_DELIVERY_COMPLETE) ? SMR_RETURN_CMD : 0; + *smr_flags |= (op_flags & FI_REMOTE_CQ_DATA) ? SMR_REMOTE_CQ_DATA : 0; /* Do not inline/inject if IPC is available so device to device * transfer may occur if possible. */ if (iov_count == 1 && desc && desc[0] && ipc_valid) { @@ -420,6 +432,7 @@ int smr_select_proto(void **desc, size_t iov_count, bool vma_avail, } if (op == ofi_op_read_req) { + *smr_flags |= SMR_RETURN_CMD; if (use_ipc) return smr_proto_ipc; if (total_len <= SMR_INJECT_SIZE) @@ -429,32 +442,40 @@ int smr_select_proto(void **desc, size_t iov_count, bool vma_avail, return smr_proto_sar; } - if (fastcopy_avail && total_len <= smr_env.max_gdrcopy_size) - return total_len <= SMR_MSG_DATA_LEN ? smr_proto_inline : - smr_proto_inject; + if (fastcopy_avail && total_len <= smr_env.max_gdrcopy_size) { + if (total_len <= SMR_MSG_DATA_LEN && + !(op_flags & FI_DELIVERY_COMPLETE)) + return smr_proto_inline; + else + return smr_proto_inject; + } - if (use_ipc && !(op_flags & FI_INJECT)) + if (use_ipc && !(op_flags & FI_INJECT)) { + *smr_flags |= SMR_RETURN_CMD; return smr_proto_ipc; + } if (op_flags & FI_INJECT || total_len <= SMR_INJECT_SIZE) { if (op_flags & FI_DELIVERY_COMPLETE) return smr_proto_inject; + return total_len <= SMR_MSG_DATA_LEN ? smr_proto_inline : smr_proto_inject; } + *smr_flags |= SMR_RETURN_CMD; return vma_avail ? smr_proto_iov: smr_proto_sar; } static ssize_t smr_do_inline(struct smr_ep *ep, struct smr_region *peer_smr, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t tag, uint64_t data, uint64_t op_flags, - struct ofi_mr **desc, const struct iovec *iov, - size_t iov_count, size_t total_len, void *context, + uint8_t smr_flags, struct ofi_mr **desc, + const struct iovec *iov, size_t iov_count, + size_t total_len, void *context, struct smr_cmd *cmd) { - cmd->hdr.tx_ctx = 0; - smr_generic_format(cmd, tx_id, rx_id, op, tag, data, op_flags); + smr_generic_format(cmd, tx_id, rx_id, op, tag, data, smr_flags, 0); smr_format_inline(cmd, desc, iov, iov_count); return FI_SUCCESS; @@ -463,49 +484,68 @@ static ssize_t smr_do_inline(struct smr_ep *ep, struct smr_region *peer_smr, static ssize_t smr_do_inject(struct smr_ep *ep, struct smr_region *peer_smr, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t tag, uint64_t data, uint64_t op_flags, - struct ofi_mr **desc, const struct iovec *iov, - size_t iov_count, size_t total_len, void *context, + uint8_t smr_flags, struct ofi_mr **desc, + const struct iovec *iov, size_t iov_count, + size_t total_len, void *context, struct smr_cmd *cmd) { - struct smr_pend_entry *pend; + struct smr_pend_entry *pend = NULL; + int ret; - pend = ofi_buf_alloc(ep->pend_pool); - assert(pend); + if (smr_flags & SMR_RETURN_CMD) { + pend = ofi_buf_alloc(ep->pend_pool); + assert(pend); + smr_format_tx_pend(pend, cmd, context, desc, iov, iov_count, + op_flags); + if (smr_freestack_avail(smr_cmd_stack(ep->region)) <= + smr_env.buffer_threshold) + smr_flags |= SMR_BUFFER_RECV; + } - cmd->hdr.tx_ctx = (uintptr_t) pend; - smr_format_tx_pend(pend, cmd, context, desc, iov, iov_count, op_flags); + smr_generic_format(cmd, tx_id, rx_id, op, tag, data, smr_flags, + (uintptr_t) pend); + ret = smr_format_inject(ep, peer_smr, cmd, desc, iov, iov_count, + op_flags); + if (ret) + goto err; - smr_generic_format(cmd, tx_id, rx_id, op, tag, data, op_flags); - smr_format_inject(ep, cmd, pend); + if (total_len != cmd->hdr.size) { + ret = -FI_ETRUNC; + goto err; + } - if (smr_freestack_avail(smr_cmd_stack(ep->region)) <= - smr_env.buffer_threshold) - cmd->hdr.op_flags |= SMR_BUFFER_RECV; + if (pend && op != ofi_op_read_req) + pend->bytes_done = cmd->hdr.size; return FI_SUCCESS; +err: + if (pend) + ofi_buf_free(pend); + return ret; } static ssize_t smr_do_iov(struct smr_ep *ep, struct smr_region *peer_smr, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t tag, uint64_t data, uint64_t op_flags, - struct ofi_mr **desc, const struct iovec *iov, - size_t iov_count, size_t total_len, void *context, - struct smr_cmd *cmd) + uint8_t smr_flags, struct ofi_mr **desc, + const struct iovec *iov, size_t iov_count, + size_t total_len, void *context, struct smr_cmd *cmd) { struct smr_pend_entry *pend; pend = ofi_buf_alloc(ep->pend_pool); assert(pend); - cmd->hdr.tx_ctx = (uintptr_t) pend; smr_format_tx_pend(pend, cmd, context, desc, iov, iov_count, op_flags); - smr_generic_format(cmd, tx_id, rx_id, op, tag, data, op_flags); + smr_generic_format(cmd, tx_id, rx_id, op, tag, data, smr_flags, + (uintptr_t) pend); + smr_format_iov(cmd, pend); if (smr_freestack_avail(smr_cmd_stack(ep->region)) <= smr_env.buffer_threshold) - cmd->hdr.op_flags |= SMR_BUFFER_RECV; + cmd->hdr.smr_flags |= SMR_BUFFER_RECV; return FI_SUCCESS; } @@ -513,9 +553,9 @@ static ssize_t smr_do_iov(struct smr_ep *ep, struct smr_region *peer_smr, static ssize_t smr_do_sar(struct smr_ep *ep, struct smr_region *peer_smr, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t tag, uint64_t data, uint64_t op_flags, - struct ofi_mr **desc, const struct iovec *iov, - size_t iov_count, size_t total_len, void *context, - struct smr_cmd *cmd) + uint8_t smr_flags, struct ofi_mr **desc, + const struct iovec *iov, size_t iov_count, + size_t total_len, void *context, struct smr_cmd *cmd) { struct smr_pend_entry *pend; int ret; @@ -523,7 +563,6 @@ static ssize_t smr_do_sar(struct smr_ep *ep, struct smr_region *peer_smr, pend = ofi_buf_alloc(ep->pend_pool); assert(pend); - cmd->hdr.tx_ctx = (uintptr_t) pend; smr_format_tx_pend(pend, cmd, context, desc, iov, iov_count, op_flags); pend->sar_dir = op == ofi_op_read_req ? @@ -534,7 +573,9 @@ static ssize_t smr_do_sar(struct smr_ep *ep, struct smr_region *peer_smr, else pend->sar_copy_fn = &smr_copy_sar; - smr_generic_format(cmd, tx_id, rx_id, op, tag, data, op_flags); + smr_generic_format(cmd, tx_id, rx_id, op, tag, data, smr_flags, + (uintptr_t) pend); + ret = smr_format_sar(ep, cmd, desc, iov, iov_count, total_len, ep->region, peer_smr, pend); if (ret) @@ -546,9 +587,9 @@ static ssize_t smr_do_sar(struct smr_ep *ep, struct smr_region *peer_smr, static ssize_t smr_do_ipc(struct smr_ep *ep, struct smr_region *peer_smr, int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t tag, uint64_t data, uint64_t op_flags, - struct ofi_mr **desc, const struct iovec *iov, - size_t iov_count, size_t total_len, void *context, - struct smr_cmd *cmd) + uint8_t smr_flags, struct ofi_mr **desc, + const struct iovec *iov, size_t iov_count, + size_t total_len, void *context, struct smr_cmd *cmd) { struct smr_pend_entry *pend; int ret = -FI_EAGAIN; @@ -556,8 +597,8 @@ static ssize_t smr_do_ipc(struct smr_ep *ep, struct smr_region *peer_smr, pend = ofi_buf_alloc(ep->pend_pool); assert(pend); - cmd->hdr.tx_ctx = (uintptr_t) pend; - smr_generic_format(cmd, tx_id, rx_id, op, tag, data, op_flags); + smr_generic_format(cmd, tx_id, rx_id, op, tag, data, smr_flags, + (uintptr_t) pend); assert(iov_count == 1 && desc && desc[0]); ret = smr_format_ipc(cmd, iov[0].iov_base, total_len, ep->region, desc[0]->iface, desc[0]->device); @@ -568,7 +609,7 @@ static ssize_t smr_do_ipc(struct smr_ep *ep, struct smr_region *peer_smr, "fallback to using SAR\n"); ofi_buf_free(pend); return smr_do_sar(ep, peer_smr, tx_id, rx_id, op, tag, data, - op_flags, desc, iov, iov_count, + op_flags, smr_flags, desc, iov, iov_count, total_len, context, cmd); } @@ -576,7 +617,7 @@ static ssize_t smr_do_ipc(struct smr_ep *ep, struct smr_region *peer_smr, if (smr_freestack_avail(smr_cmd_stack(ep->region)) <= smr_env.buffer_threshold) - cmd->hdr.op_flags |= SMR_BUFFER_RECV; + cmd->hdr.smr_flags |= SMR_BUFFER_RECV; return FI_SUCCESS; } @@ -696,25 +737,16 @@ static int smr_discard(struct fi_peer_rx_entry *rx_entry) struct smr_unexp_buf *sar_buf; dlist_remove(&cmd_ctx->entry); + while (!slist_empty(&cmd_ctx->buf_list)) { + slist_remove_head_container( + &cmd_ctx->buf_list, + struct smr_unexp_buf, sar_buf, + entry); + ofi_buf_free(sar_buf); + } - switch (cmd_ctx->cmd->hdr.proto) { - case smr_proto_inline: - break; - case smr_proto_sar: - while (!slist_empty(&cmd_ctx->buf_list)) { - slist_remove_head_container( - &cmd_ctx->buf_list, - struct smr_unexp_buf, sar_buf, - entry); - ofi_buf_free(sar_buf); - } - break; - case smr_proto_inject: - case smr_proto_iov: - case smr_proto_ipc: + if (cmd_ctx->cmd->hdr.smr_flags & SMR_RETURN_CMD) smr_return_cmd(cmd_ctx->ep, cmd_ctx->cmd); - break; - } ofi_buf_free(cmd_ctx); diff --git a/prov/shm/src/smr_msg.c b/prov/shm/src/smr_msg.c index 29ca9684f80..18e847f3eae 100644 --- a/prov/shm/src/smr_msg.c +++ b/prov/shm/src/smr_msg.c @@ -85,6 +85,7 @@ static ssize_t smr_generic_sendmsg(struct smr_ep *ep, const struct iovec *iov, int proto; struct smr_cmd_entry *ce; struct smr_cmd *cmd; + uint8_t smr_flags; assert(iov_count <= SMR_IOV_LIMIT); @@ -110,9 +111,8 @@ static ssize_t smr_generic_sendmsg(struct smr_ep *ep, const struct iovec *iov, proto = smr_select_proto(desc, iov_count, smr_vma_enabled(ep, peer_smr), smr_ipc_valid(ep, peer_smr, tx_id, rx_id), op, - total_len, op_flags); - - if (proto != smr_proto_inline) { + total_len, op_flags, &smr_flags); + if (smr_flags & SMR_RETURN_CMD) { if (smr_freestack_isempty(smr_cmd_stack(ep->region))) { smr_cmd_queue_discard(ce, pos); ret = -FI_EAGAIN; @@ -128,17 +128,17 @@ static ssize_t smr_generic_sendmsg(struct smr_ep *ep, const struct iovec *iov, } ret = smr_send_ops[proto](ep, peer_smr, tx_id, rx_id, op, tag, data, - op_flags, (struct ofi_mr **) desc, iov, - iov_count, total_len, context, cmd); + op_flags, smr_flags, (struct ofi_mr **) desc, + iov, iov_count, total_len, context, cmd); if (ret) { smr_cmd_queue_discard(ce, pos); - if (proto != smr_proto_inline) + if (smr_flags & SMR_RETURN_CMD) smr_freestack_push(smr_cmd_stack(ep->region), cmd); goto unlock; } smr_cmd_queue_commit(ce, pos); - if (proto != smr_proto_inline) + if (smr_flags & SMR_RETURN_CMD) goto unlock; ret = smr_complete_tx(ep, context, op, op_flags); @@ -205,6 +205,7 @@ static ssize_t smr_generic_inject(struct fid_ep *ep_fid, const void *buf, int proto; struct smr_cmd_entry *ce; struct smr_cmd *cmd; + uint8_t smr_flags; assert(len <= SMR_INJECT_SIZE); @@ -232,36 +233,20 @@ static ssize_t smr_generic_inject(struct fid_ep *ep_fid, const void *buf, goto unlock; } - if (len <= SMR_MSG_DATA_LEN) { - proto = smr_proto_inline; - cmd = &ce->cmd; - } else { - proto = smr_proto_inject; - if (smr_freestack_isempty(smr_cmd_stack(ep->region))) { - smr_cmd_queue_discard(ce, pos); - ret = -FI_EAGAIN; - goto unlock; - } - - cmd = smr_freestack_pop(smr_cmd_stack(ep->region)); - assert(cmd); - ce->ptr = smr_local_to_peer(ep, peer_smr, tx_id, rx_id, - (uintptr_t) cmd); - } + proto = len <= SMR_MSG_DATA_LEN ? smr_proto_inline : smr_proto_inject; + cmd = &ce->cmd; + smr_flags = (op_flags & FI_REMOTE_CQ_DATA) ? SMR_REMOTE_CQ_DATA : 0; ret = smr_send_ops[proto](ep, peer_smr, tx_id, rx_id, op, tag, data, - op_flags, NULL, &msg_iov, 1, len, NULL, cmd); + op_flags, smr_flags, NULL, &msg_iov, 1, len, + NULL, cmd); if (ret) { - if (proto != smr_proto_inline) - smr_freestack_push(smr_cmd_stack(ep->region), cmd); smr_cmd_queue_discard(ce, pos); ret = -FI_EAGAIN; goto unlock; } smr_cmd_queue_commit(ce, pos); - - if (proto == smr_proto_inline) - ofi_ep_peer_tx_cntr_inc(&ep->util_ep, op); + ofi_ep_peer_tx_cntr_inc(&ep->util_ep, op); unlock: ofi_genlock_unlock(&ep->util_ep.lock); diff --git a/prov/shm/src/smr_progress.c b/prov/shm/src/smr_progress.c index 3c126151f86..0146ef48ca9 100644 --- a/prov/shm/src/smr_progress.c +++ b/prov/shm/src/smr_progress.c @@ -46,7 +46,8 @@ static void smr_progress_overflow(struct smr_ep *ep) entry = ep->overflow_list.head; while (entry) { - cmd = (struct smr_cmd *) entry; + cmd = container_of(container_of(entry, struct smr_cmd_hdr, + entry), struct smr_cmd, hdr); peer_smr = smr_peer_region(ep, cmd->hdr.tx_id); ret = smr_cmd_queue_next(smr_cmd_queue(peer_smr), &ce, &pos); if (ret == -FI_ENOENT) @@ -75,8 +76,7 @@ void smr_free_sar_bufs(struct smr_ep *ep, struct smr_cmd *cmd, int i; for (i = cmd->data.buf_batch_size - 1; i >= 0; i--) { - smr_freestack_push_by_index(smr_sar_pool(ep->region), - cmd->data.sar[i]); + smr_return_sar_buf_by_index(ep->region, cmd->data.sar[i]); } smr_peer_data(ep->region)[cmd->hdr.tx_id].sar_status = SMR_SAR_FREE; } @@ -87,6 +87,7 @@ static int smr_progress_return_entry(struct smr_ep *ep, struct smr_cmd *cmd, struct smr_inject_buf *tx_buf = NULL; uint8_t *src; ssize_t hmem_copy_ret; + struct smr_region *peer_smr; int ret = FI_SUCCESS; switch (cmd->hdr.proto) { @@ -96,9 +97,9 @@ static int smr_progress_return_entry(struct smr_ep *ep, struct smr_cmd *cmd, assert(pend->mr[0]); break; case smr_proto_sar: - if (cmd->hdr.status) { + if (cmd->hdr.smr_flags & SMR_OP_ERROR) { smr_free_sar_bufs(ep, cmd, pend); - return cmd->hdr.status; + return -FI_EIO; } if (cmd->hdr.op == ofi_op_read_req) { @@ -108,6 +109,7 @@ static int smr_progress_return_entry(struct smr_ep *ep, struct smr_cmd *cmd, if (ret && ret != -FI_EBUSY) return ret; + if (pend->bytes_done == cmd->hdr.size) { smr_free_sar_bufs(ep, cmd, pend); return FI_SUCCESS; @@ -132,33 +134,33 @@ static int smr_progress_return_entry(struct smr_ep *ep, struct smr_cmd *cmd, smr_try_send_cmd(ep, cmd); return -FI_EAGAIN; case smr_proto_inject: - tx_buf = smr_get_inject_buf(ep->region, cmd); - if (pend) { - if (pend->bytes_done != cmd->hdr.size && - cmd->hdr.op != ofi_op_atomic) { - src = cmd->hdr.op == ofi_op_atomic_compare ? - tx_buf->buf : tx_buf->data; - hmem_copy_ret = ofi_copy_to_mr_iov( - pend->mr, pend->iov, - pend->iov_count, - 0, src, cmd->hdr.size); - - if (hmem_copy_ret < 0) { - FI_WARN(&smr_prov, FI_LOG_EP_CTRL, - "RMA read/fetch failed " - "with code %d\n", - (int)(-hmem_copy_ret)); - ret = hmem_copy_ret; - } else if (hmem_copy_ret != cmd->hdr.size) { - FI_WARN(&smr_prov, FI_LOG_EP_CTRL, - "Incomplete rma read/fetch " - "buffer copied\n"); - ret = -FI_ETRUNC; - } else { - pend->bytes_done = - (size_t) hmem_copy_ret; - } + assert(pend); + if (pend->bytes_done != cmd->hdr.size && + cmd->hdr.op != ofi_op_atomic) { + peer_smr = smr_peer_region(ep, cmd->hdr.tx_id); + tx_buf = smr_freestack_get_entry_from_index( + smr_inject_pool(peer_smr), + cmd->data.inject_buf_index); + src = cmd->hdr.op == ofi_op_atomic_compare ? + tx_buf->buf : tx_buf->data; + hmem_copy_ret = ofi_copy_to_mr_iov( + pend->mr, pend->iov, + pend->iov_count, + 0, src, cmd->hdr.size); + + if (hmem_copy_ret < 0) { + FI_WARN(&smr_prov, FI_LOG_EP_CTRL, + "RMA read/fetch failed " + "with code %d\n", + (int)(-hmem_copy_ret)); + ret = hmem_copy_ret; + } else if (hmem_copy_ret != cmd->hdr.size) { + FI_WARN(&smr_prov, FI_LOG_EP_CTRL, + "Incomplete rma read/fetch " + "buffer copied\n"); + ret = -FI_ETRUNC; } + smr_return_inject_buf(peer_smr, tx_buf); } break; default: @@ -188,27 +190,25 @@ static void smr_progress_return(struct smr_ep *ep) ret = smr_progress_return_entry(ep, cmd, pending); if (ret != -FI_EAGAIN) { - if (pending) { - if (cmd->hdr.status) { - ret = smr_write_err_comp( - ep->util_ep.tx_cq, - pending->comp_ctx, - pending->comp_flags, - cmd->hdr.tag, - cmd->hdr.status); - } else { - ret = smr_complete_tx( - ep, pending->comp_ctx, - cmd->hdr.op, - pending->comp_flags); - } - if (ret) { - FI_WARN(&smr_prov, FI_LOG_EP_CTRL, - "unable to process " - "tx completion\n"); - } - ofi_buf_free(pending); + assert(pending); + if (cmd->hdr.smr_flags & SMR_OP_ERROR) { + ret = smr_write_err_comp( + ep->util_ep.tx_cq, + pending->comp_ctx, + pending->comp_flags, + cmd->hdr.tag, -FI_EIO); + } else { + ret = smr_complete_tx( + ep, pending->comp_ctx, + cmd->hdr.op, + pending->comp_flags); } + if (ret) { + FI_WARN(&smr_prov, FI_LOG_EP_CTRL, + "unable to process " + "tx completion\n"); + } + ofi_buf_free(pending); smr_freestack_push(smr_cmd_stack(ep->region), cmd); } smr_return_queue_release(smr_return_queue(ep->region), @@ -242,33 +242,34 @@ static ssize_t smr_progress_inject(struct smr_ep *ep, struct smr_cmd *cmd, struct ofi_mr **mr, struct iovec *iov, size_t iov_count) { - struct smr_region *peer_smr; struct smr_inject_buf *tx_buf; ssize_t ret; - peer_smr = smr_peer_region(ep, cmd->hdr.rx_id); - tx_buf = smr_get_inject_buf(peer_smr, cmd); - - if (cmd->hdr.op == ofi_op_read_req) { - ret = ofi_copy_from_mr_iov(tx_buf->data, cmd->hdr.size, mr, - iov, iov_count, 0); - } else { + tx_buf = smr_freestack_get_entry_from_index(smr_inject_pool(ep->region), + cmd->data.inject_buf_index); + if (cmd->hdr.op != ofi_op_read_req) { ret = ofi_copy_to_mr_iov(mr, iov, iov_count, 0, tx_buf->data, cmd->hdr.size); + smr_return_inject_buf(ep->region, tx_buf); + } else { + ret = ofi_copy_from_mr_iov(tx_buf->data, cmd->hdr.size, mr, + iov, iov_count, 0); } if (ret < 0) { FI_WARN(&smr_prov, FI_LOG_EP_CTRL, "inject recv failed with code %lu\n", ret); + cmd->hdr.smr_flags |= SMR_OP_ERROR; } else if (ret != cmd->hdr.size) { FI_WARN(&smr_prov, FI_LOG_EP_CTRL, - "inject recv truncated\n"); + "inject recv truncated. Expected size %zu copied size " + "%zu\n", cmd->hdr.size, ret); ret = -FI_ETRUNC; + cmd->hdr.smr_flags |= SMR_OP_ERROR; } else { ret = FI_SUCCESS; } - cmd->hdr.status = ret; return ret; } @@ -289,7 +290,9 @@ static ssize_t smr_progress_iov(struct smr_ep *ep, struct smr_cmd *cmd, cmd->data.iov_count, cmd->hdr.size, peer_smr->pid, cmd->hdr.op == ofi_op_read_req, xpmem); - cmd->hdr.status = ret; + if (ret) + cmd->hdr.smr_flags |= SMR_OP_ERROR; + return ret; } @@ -344,7 +347,7 @@ static ssize_t smr_try_copy_rx_sar(struct smr_ep *ep, if (ret == -FI_EAGAIN) dlist_insert_tail(&pend->entry, &ep->async_cpy_list); else if (ret != -FI_EBUSY) - pend->cmd->hdr.status = ret; + pend->cmd->hdr.smr_flags |= SMR_OP_ERROR; } return ret; } @@ -361,7 +364,7 @@ static int smr_progress_pending_sar(struct smr_ep *ep, struct smr_cmd *cmd) dlist_insert_tail(&pend->entry, &ep->async_cpy_list); return FI_SUCCESS; } - cmd->hdr.status = ret; + cmd->hdr.smr_flags |= SMR_OP_ERROR; goto out; } @@ -369,13 +372,13 @@ static int smr_progress_pending_sar(struct smr_ep *ep, struct smr_cmd *cmd) if (ret == -FI_EBUSY || ret == -FI_EAGAIN) return FI_SUCCESS; - if (pend->bytes_done == cmd->hdr.size || pend->cmd->hdr.status) { - if (pend->cmd->hdr.status) { + if (pend->bytes_done == cmd->hdr.size || + pend->cmd->hdr.smr_flags & SMR_OP_ERROR) { + if (pend->cmd->hdr.smr_flags & SMR_OP_ERROR) { ret = smr_write_err_comp(ep->util_ep.rx_cq, pend->comp_ctx, pend->comp_flags, - cmd->hdr.tag, - pend->cmd->hdr.status); + cmd->hdr.tag, -FI_EIO); } else { ret = smr_complete_rx(ep, pend->comp_ctx, cmd->hdr.op, @@ -419,10 +422,10 @@ static void smr_init_rx_pend(struct smr_pend_entry *pend, struct smr_cmd *cmd, if (rx_entry) { pend->comp_ctx = rx_entry->context; pend->comp_flags = smr_rx_cq_flags(rx_entry->flags, - cmd->hdr.op_flags); + cmd->hdr.smr_flags); } else { pend->comp_ctx = NULL; - pend->comp_flags = smr_rx_cq_flags(0, cmd->hdr.op_flags); + pend->comp_flags = smr_rx_cq_flags(0, cmd->hdr.smr_flags); } pend->cmd = cmd; @@ -467,7 +470,7 @@ static ssize_t smr_progress_sar(struct smr_ep *ep, struct smr_cmd *cmd, ret = smr_try_copy_rx_sar(ep, pend); - if (pend->bytes_done == cmd->hdr.size || pend->cmd->hdr.status) { + if (pend->bytes_done == cmd->hdr.size) { cmd->hdr.rx_ctx = 0; ofi_buf_free(pend); ret = FI_SUCCESS; @@ -572,7 +575,9 @@ static ssize_t smr_progress_ipc(struct smr_ep *ep, struct smr_cmd *cmd, uncache: ofi_mr_cache_delete(domain->ipc_cache, mr_entry); out: - cmd->hdr.status = ret; + if (ret) + cmd->hdr.smr_flags |= SMR_OP_ERROR; + return ret; } @@ -591,7 +596,7 @@ static smr_progress_func smr_progress_ops[smr_proto_max] = { static void smr_do_atomic(struct smr_cmd *cmd, void *src, struct ofi_mr *dst_mr, void *dst, void *cmp, enum fi_datatype datatype, - enum fi_op op, size_t cnt, uint16_t flags) + enum fi_op op, size_t cnt) { char tmp_result[SMR_INJECT_SIZE]; char tmp_dst[SMR_INJECT_SIZE]; @@ -647,7 +652,7 @@ static int smr_progress_inline_atomic(struct smr_cmd *cmd, struct ofi_mr **mr, for (i = *len = 0; i < ioc_count && *len < cmd->hdr.size; i++) { smr_do_atomic(cmd, &src[*len], mr[i], ioc[i].addr, NULL, cmd->hdr.datatype, cmd->hdr.atomic_op, - ioc[i].count, cmd->hdr.op_flags); + ioc[i].count); *len += ioc[i].count * ofi_datatype_size(cmd->hdr.datatype); } @@ -667,7 +672,8 @@ static int smr_progress_inject_atomic(struct smr_cmd *cmd, struct ofi_mr **mr, uint8_t *src, *comp; int i; - tx_buf = smr_get_inject_buf(smr_peer_region(ep, cmd->hdr.rx_id), cmd); + tx_buf = smr_freestack_get_entry_from_index(smr_inject_pool(ep->region), + cmd->data.inject_buf_index); switch (cmd->hdr.op) { case ofi_op_atomic_compare: @@ -683,11 +689,11 @@ static int smr_progress_inject_atomic(struct smr_cmd *cmd, struct ofi_mr **mr, for (i = *len = 0; i < ioc_count && *len < cmd->hdr.size; i++) { smr_do_atomic(cmd, &src[*len], mr[i], ioc[i].addr, comp ? &comp[*len] : NULL, cmd->hdr.datatype, - cmd->hdr.atomic_op, ioc[i].count, - cmd->hdr.op_flags); + cmd->hdr.atomic_op, ioc[i].count); *len += ioc[i].count * ofi_datatype_size(cmd->hdr.datatype); } + smr_return_inject_buf(ep->region, tx_buf); if (*len != cmd->hdr.size) { FI_WARN(&smr_prov, FI_LOG_EP_CTRL, "recv truncated"); return -FI_ETRUNC; @@ -697,25 +703,24 @@ static int smr_progress_inject_atomic(struct smr_cmd *cmd, struct ofi_mr **mr, } static int smr_start_common(struct smr_ep *ep, struct smr_cmd *cmd, + struct smr_cmd_hdr *hdr, struct fi_peer_rx_entry *rx_entry) { struct smr_pend_entry *pend; uint64_t comp_flags; void *comp_buf; int ret; - bool return_cmd = cmd->hdr.proto != smr_proto_inline; + bool return_cmd = hdr->smr_flags & SMR_RETURN_CMD; rx_entry->peer_context = NULL; - assert (cmd->hdr.proto < smr_proto_max); - ret = smr_progress_ops[cmd->hdr.proto]( + assert (hdr->proto < smr_proto_max); + ret = smr_progress_ops[hdr->proto]( ep, cmd, rx_entry, (struct ofi_mr **) rx_entry->desc, rx_entry->iov, rx_entry->count); - if (!cmd->hdr.rx_ctx) { comp_buf = rx_entry->iov[0].iov_base; - comp_flags = smr_rx_cq_flags(rx_entry->flags, - cmd->hdr.op_flags); + comp_flags = smr_rx_cq_flags(rx_entry->flags, hdr->smr_flags); if (ret) { FI_WARN(&smr_prov, FI_LOG_EP_CTRL, "error processing op\n"); @@ -725,17 +730,17 @@ static int smr_start_common(struct smr_ep *ep, struct smr_cmd *cmd, ret); } else { ret = smr_complete_rx(ep, rx_entry->context, - cmd->hdr.op, comp_flags, - cmd->hdr.size, comp_buf, - cmd->hdr.rx_id, cmd->hdr.tag, - cmd->hdr.cq_data); + hdr->op, comp_flags, + hdr->size, comp_buf, + hdr->rx_id, hdr->tag, + hdr->cq_data); } if (ret) { FI_WARN(&smr_prov, FI_LOG_EP_CTRL, "unable to process rx completion\n"); } ep->srx->owner_ops->free_entry(rx_entry); - } else if (cmd->hdr.proto == smr_proto_sar) { + } else if (hdr->proto == smr_proto_sar) { pend = (struct smr_pend_entry *) cmd->hdr.rx_ctx; if (pend->sar_copy_fn == &smr_dsa_copy_sar) return_cmd = false; @@ -756,7 +761,7 @@ static int smr_copy_saved(struct smr_cmd_ctx *cmd_ctx, int ret; comp_flags = smr_rx_cq_flags(rx_entry->flags, - cmd_ctx->cmd->hdr.op_flags); + cmd_ctx->cmd->hdr.smr_flags); while (!slist_empty(&cmd_ctx->buf_list)) { slist_remove_head_container(&cmd_ctx->buf_list, struct smr_unexp_buf, buf, entry); @@ -814,11 +819,11 @@ int smr_unexp_start(struct fi_peer_rx_entry *rx_entry) int ret = FI_SUCCESS; dlist_remove(&cmd_ctx->entry); - - if (cmd_ctx->cmd->hdr.op_flags & SMR_BUFFER_RECV) + if (cmd_ctx->cmd->hdr.smr_flags & SMR_BUFFER_RECV) ret = smr_copy_saved(cmd_ctx, rx_entry); else - ret = smr_start_common(cmd_ctx->ep, cmd_ctx->cmd, rx_entry); + ret = smr_start_common(cmd_ctx->ep, cmd_ctx->cmd, + &cmd_ctx->cmd->hdr, rx_entry); ofi_buf_free(cmd_ctx); rx_entry->peer_context = NULL; @@ -885,34 +890,33 @@ static int smr_unexp_inline(struct smr_ep *ep, struct smr_cmd_ctx *cmd_ctx, static int smr_unexp_inject(struct smr_ep *ep, struct smr_cmd_ctx *cmd_ctx, struct smr_cmd *cmd) { - struct smr_region *peer_smr; - struct smr_unexp_buf *buf; struct smr_inject_buf *tx_buf; - int ret = FI_SUCCESS; - - if (!(cmd->hdr.op_flags & SMR_BUFFER_RECV)) { - cmd_ctx->cmd = cmd; - return FI_SUCCESS; - } + struct smr_unexp_buf *buf; - peer_smr = smr_peer_region(ep, cmd_ctx->cmd->hdr.rx_id); + cmd->hdr.smr_flags |= SMR_BUFFER_RECV; memcpy(&cmd_ctx->cmd_cpy, cmd, sizeof(cmd_ctx->cmd->hdr)); cmd_ctx->cmd = &cmd_ctx->cmd_cpy; + tx_buf = smr_freestack_get_entry_from_index(smr_inject_pool(ep->region), + cmd->data.inject_buf_index); buf = ofi_buf_alloc(ep->unexp_buf_pool); if (!buf) { - ret = -FI_ENOMEM; - cmd->hdr.status = ret; - goto out; + FI_WARN(&smr_prov, FI_LOG_EP_CTRL, + "Error allocating buffer\n"); + ofi_buf_free(cmd_ctx); + return -FI_ENOMEM; } - tx_buf = smr_get_inject_buf(peer_smr, cmd); - memcpy(buf->buf, tx_buf->buf, cmd_ctx->cmd->hdr.size); + memcpy(buf->buf, tx_buf->data, cmd->hdr.size); + smr_return_inject_buf(ep->region, tx_buf); + slist_init(&cmd_ctx->buf_list); slist_insert_tail(&buf->entry, &cmd_ctx->buf_list); -out: - smr_return_cmd(ep, cmd); - return ret; + + if (cmd->hdr.smr_flags & SMR_RETURN_CMD) + smr_return_cmd(ep, cmd); + + return FI_SUCCESS; } static int smr_unexp_iov(struct smr_ep *ep, struct smr_cmd_ctx *cmd_ctx, @@ -923,7 +927,7 @@ static int smr_unexp_iov(struct smr_ep *ep, struct smr_cmd_ctx *cmd_ctx, struct iovec iov; int ret = FI_SUCCESS; - if (!(cmd->hdr.op_flags & SMR_BUFFER_RECV)) { + if (!(cmd->hdr.smr_flags & SMR_BUFFER_RECV)) { cmd_ctx->cmd = cmd; return FI_SUCCESS; } @@ -956,7 +960,9 @@ static int smr_unexp_iov(struct smr_ep *ep, struct smr_cmd_ctx *cmd_ctx, slist_insert_tail(&buf->entry, &cmd_ctx->buf_list); } out: - cmd->hdr.status = ret; + if (ret) + cmd->hdr.smr_flags |= SMR_OP_ERROR; + smr_return_cmd(ep, cmd); return ret; } @@ -968,35 +974,35 @@ static int smr_unexp_sar(struct smr_ep *ep, struct smr_cmd_ctx *cmd_ctx, struct smr_pend_entry *sar_entry; int ret; - cmd->hdr.op_flags |= SMR_BUFFER_RECV; + cmd->hdr.smr_flags |= SMR_BUFFER_RECV; cmd_ctx->cmd = &cmd_ctx->cmd_cpy; memcpy(&cmd_ctx->cmd_cpy, cmd, sizeof(cmd->hdr) + sizeof(cmd->data)); - if (cmd->hdr.size) { - sar_entry = ofi_buf_alloc(ep->pend_pool); - if (!sar_entry) { - ofi_buf_free(cmd_ctx); - ret = -FI_ENOMEM; - cmd->hdr.status = ret; - goto out; - } - cmd->hdr.rx_ctx = (uintptr_t) sar_entry; + assert(cmd->hdr.size); + sar_entry = ofi_buf_alloc(ep->pend_pool); + if (!sar_entry) { + ofi_buf_free(cmd_ctx); + ret = -FI_ENOMEM; + cmd->hdr.smr_flags |= SMR_OP_ERROR; + goto out; + } + cmd->hdr.rx_ctx = (uintptr_t) sar_entry; - smr_init_rx_pend(sar_entry, cmd, rx_entry, NULL, NULL, 0); - cmd_ctx->pend = sar_entry; + smr_init_rx_pend(sar_entry, cmd, rx_entry, NULL, NULL, 0); + cmd_ctx->pend = sar_entry; - ret = smr_buffer_sar(ep, sar_entry, - sar_entry->rx_entry->peer_context); - if (ret == -FI_EAGAIN) { - dlist_insert_tail(&sar_entry->entry, - &ep->async_cpy_list); - return FI_SUCCESS; - } - cmd->hdr.status = ret; + ret = smr_buffer_sar(ep, sar_entry, + sar_entry->rx_entry->peer_context); + if (ret == -FI_EAGAIN) { + dlist_insert_tail(&sar_entry->entry, + &ep->async_cpy_list); + return FI_SUCCESS; } + cmd->hdr.smr_flags |= SMR_OP_ERROR; out: + assert(cmd->hdr.smr_flags & SMR_RETURN_CMD); smr_return_cmd(ep, cmd); return FI_SUCCESS; } @@ -1010,7 +1016,7 @@ static int smr_unexp_ipc(struct smr_ep *ep, struct smr_cmd_ctx *cmd_ctx, struct iovec iov; int ret = FI_SUCCESS;; - if (!(cmd->hdr.op_flags & SMR_BUFFER_RECV)) { + if (!(cmd->hdr.smr_flags & SMR_BUFFER_RECV)) { cmd_ctx->cmd = cmd; return FI_SUCCESS; } @@ -1051,7 +1057,9 @@ static int smr_unexp_ipc(struct smr_ep *ep, struct smr_cmd_ctx *cmd_ctx, cmd_ctx->cmd->hdr.size = total_size; out: - cmd->hdr.status = ret; + if (ret) + cmd->hdr.smr_flags |= SMR_OP_ERROR; + smr_return_cmd(ep, cmd); return ret; } @@ -1088,7 +1096,7 @@ static int smr_alloc_cmd_ctx(struct smr_ep *ep, slist_init(&cmd_ctx->buf_list); rx_entry->msg_size = cmd->hdr.size; - if (cmd->hdr.op_flags & SMR_REMOTE_CQ_DATA) { + if (cmd->hdr.smr_flags & SMR_REMOTE_CQ_DATA) { rx_entry->flags |= FI_REMOTE_CQ_DATA; rx_entry->cq_data = cmd->hdr.cq_data; } @@ -1105,7 +1113,8 @@ static int smr_alloc_cmd_ctx(struct smr_ep *ep, return FI_SUCCESS; } -static int smr_progress_cmd_msg(struct smr_ep *ep, struct smr_cmd *cmd) +static int smr_progress_cmd_msg(struct smr_ep *ep, struct smr_cmd *cmd, + struct smr_cmd_hdr *hdr) { struct fi_peer_match_attr attr; struct fi_peer_rx_entry *rx_entry; @@ -1114,10 +1123,10 @@ static int smr_progress_cmd_msg(struct smr_ep *ep, struct smr_cmd *cmd) if (cmd->hdr.rx_ctx) return smr_progress_pending(ep, cmd); - attr.addr = ep->map->peers[cmd->hdr.rx_id].fiaddr; - attr.msg_size = cmd->hdr.size; - attr.tag = cmd->hdr.tag; - if (cmd->hdr.op == ofi_op_tagged) { + attr.addr = ep->map->peers[hdr->rx_id].fiaddr; + attr.msg_size = hdr->size; + attr.tag = hdr->tag; + if (hdr->op == ofi_op_tagged) { ret = ep->srx->owner_ops->get_tag(ep->srx, &attr, &rx_entry); if (ret == -FI_ENOENT) { ret = smr_alloc_cmd_ctx(ep, rx_entry, cmd); @@ -1154,7 +1163,7 @@ static int smr_progress_cmd_msg(struct smr_ep *ep, struct smr_cmd *cmd) FI_WARN(&smr_prov, FI_LOG_EP_CTRL, "Error getting rx_entry\n"); return ret; } - ret = smr_start_common(ep, cmd, rx_entry); + ret = smr_start_common(ep, cmd, hdr, rx_entry); out: return ret < 0 ? ret : 0; @@ -1169,7 +1178,7 @@ static int smr_progress_cmd_rma(struct smr_ep *ep, struct smr_cmd *cmd) int ret = 0; struct ofi_mr *mr[SMR_IOV_LIMIT]; struct smr_pend_entry *pend; - bool return_cmd = cmd->hdr.proto != smr_proto_inline; + bool return_cmd = cmd->hdr.smr_flags & SMR_RETURN_CMD; if (cmd->hdr.rx_ctx) return smr_progress_pending(ep, cmd); @@ -1213,11 +1222,11 @@ static int smr_progress_cmd_rma(struct smr_ep *ep, struct smr_cmd *cmd) FI_WARN(&smr_prov, FI_LOG_EP_CTRL, "error processing rma op\n"); ret = smr_write_err_comp(ep->util_ep.rx_cq, NULL, - smr_rx_cq_flags(0, cmd->hdr.op_flags), + smr_rx_cq_flags(0, cmd->hdr.smr_flags), 0, ret); } else { ret = smr_complete_rx(ep, NULL, cmd->hdr.op, - smr_rx_cq_flags(0, cmd->hdr.op_flags), + smr_rx_cq_flags(0, cmd->hdr.smr_flags), cmd->hdr.size, iov_count ? iov[0].iov_base : NULL, cmd->hdr.rx_id, 0, cmd->hdr.cq_data); @@ -1281,18 +1290,18 @@ static int smr_progress_cmd_atomic(struct smr_ep *ep, struct smr_cmd *cmd) "unidentified operation type\n"); err = -FI_EINVAL; } - cmd->hdr.status = -err; if (err) { + cmd->hdr.smr_flags |= SMR_OP_ERROR; FI_WARN(&smr_prov, FI_LOG_EP_CTRL, "error processing atomic op\n"); ret = smr_write_err_comp(ep->util_ep.rx_cq, NULL, smr_rx_cq_flags(0, - cmd->hdr.op_flags), 0, err); + cmd->hdr.smr_flags), 0, err); } else { ret = smr_complete_rx(ep, NULL, cmd->hdr.op, smr_rx_cq_flags(0, - cmd->hdr.op_flags), total_len, + cmd->hdr.smr_flags), total_len, ioc_count ? ioc[0].addr : NULL, cmd->hdr.rx_id, 0, cmd->hdr.cq_data); } @@ -1303,7 +1312,7 @@ static int smr_progress_cmd_atomic(struct smr_ep *ep, struct smr_cmd *cmd) } out: - if (cmd->hdr.proto != smr_proto_inline) + if (cmd->hdr.smr_flags & SMR_RETURN_CMD) smr_return_cmd(ep, cmd); return err; } @@ -1311,6 +1320,7 @@ static int smr_progress_cmd_atomic(struct smr_ep *ep, struct smr_cmd *cmd) static void smr_progress_cmd(struct smr_ep *ep) { struct smr_cmd_entry *ce; + struct smr_cmd_hdr hdr; struct smr_cmd *cmd; int ret = 0; int64_t pos; @@ -1321,10 +1331,12 @@ static void smr_progress_cmd(struct smr_ep *ep) break; cmd = (struct smr_cmd *) ce->ptr; - switch (cmd->hdr.op) { + hdr = cmd->hdr; + + switch (hdr.op) { case ofi_op_msg: case ofi_op_tagged: - ret = smr_progress_cmd_msg(ep, cmd); + ret = smr_progress_cmd_msg(ep, cmd, &hdr); break; case ofi_op_write: case ofi_op_read_req: @@ -1332,7 +1344,7 @@ static void smr_progress_cmd(struct smr_ep *ep) break; case ofi_op_write_async: case ofi_op_read_async: - ofi_ep_peer_rx_cntr_inc(&ep->util_ep, cmd->hdr.op); + ofi_ep_peer_rx_cntr_inc(&ep->util_ep, hdr.op); break; case ofi_op_atomic: case ofi_op_atomic_fetch: @@ -1403,9 +1415,11 @@ static void smr_progress_async_sar(struct smr_ep *ep, if (pend->rx_entry && pend->rx_entry->peer_context) { ret = smr_buffer_sar(ep, pend, pend->rx_entry->peer_context); - if (ret == -FI_EAGAIN) + if (ret == -FI_EAGAIN) { return; - pend->cmd->hdr.status = ret; + } else if (ret != -FI_EAGAIN) { + pend->cmd->hdr.smr_flags |= SMR_OP_ERROR; + } dlist_remove(&pend->entry); smr_return_cmd(ep, pend->cmd); return; @@ -1422,7 +1436,7 @@ static void smr_progress_async_sar(struct smr_ep *ep, dlist_remove(&pend->entry); return; } - pend->cmd->hdr.status = ret; + pend->cmd->hdr.smr_flags |= SMR_OP_ERROR; } } diff --git a/prov/shm/src/smr_rma.c b/prov/shm/src/smr_rma.c index 5c191e7d36c..fd63b65141a 100644 --- a/prov/shm/src/smr_rma.c +++ b/prov/shm/src/smr_rma.c @@ -46,7 +46,7 @@ static void smr_format_rma_resp(struct smr_cmd *cmd, int64_t peer_id, size_t total_len, uint32_t op, uint64_t op_flags) { - smr_generic_format(cmd, 0, peer_id, op, 0, 0, op_flags); + smr_generic_format(cmd, 0, peer_id, op, 0, 0, op_flags, 0); cmd->hdr.size = total_len; } @@ -128,12 +128,13 @@ static ssize_t smr_generic_rma( { struct smr_region *peer_smr; int64_t tx_id, rx_id; - int proto = smr_proto_inline; + int proto; ssize_t ret = -FI_EAGAIN; size_t total_len; struct smr_cmd_entry *ce; struct smr_cmd *cmd; int64_t pos; + uint8_t smr_flags; assert(iov_count <= SMR_IOV_LIMIT); assert(rma_count <= SMR_IOV_LIMIT); @@ -169,8 +170,8 @@ static ssize_t smr_generic_rma( proto = smr_select_proto(desc, iov_count, smr_vma_enabled(ep, peer_smr), smr_ipc_valid(ep, peer_smr, tx_id, rx_id), op, - total_len, op_flags); - if (proto != smr_proto_inline) { + total_len, op_flags, &smr_flags); + if (smr_flags & SMR_RETURN_CMD) { if (smr_freestack_isempty(smr_cmd_stack(ep->region))) { smr_cmd_queue_discard(ce, pos); ret = -FI_EAGAIN; @@ -184,10 +185,10 @@ static ssize_t smr_generic_rma( cmd = &ce->cmd; } ret = smr_send_ops[proto](ep, peer_smr, tx_id, rx_id, op, 0, data, - op_flags, (struct ofi_mr **)desc, iov, - iov_count, total_len, context, cmd); + op_flags, smr_flags, (struct ofi_mr **)desc, + iov, iov_count, total_len, context, cmd); if (ret) { - if (proto != smr_proto_inline) + if (smr_flags & SMR_RETURN_CMD) smr_freestack_push(smr_cmd_stack(ep->region), cmd); smr_cmd_queue_discard(ce, pos); goto unlock; @@ -196,7 +197,7 @@ static ssize_t smr_generic_rma( smr_add_rma_cmd(peer_smr, rma_iov, rma_count, cmd); smr_cmd_queue_commit(ce, pos); - if (proto != smr_proto_inline || op == ofi_op_read_req) + if (smr_flags & SMR_RETURN_CMD) goto unlock; ret = smr_complete_tx(ep, context, op, op_flags); @@ -320,7 +321,7 @@ static ssize_t smr_writemsg(struct fid_ep *ep_fid, const struct fi_msg_rma *msg, static ssize_t smr_generic_rma_inject( struct fid_ep *ep_fid, const void *buf, size_t len, fi_addr_t dest_addr, uint64_t addr, uint64_t key, uint64_t data, - uint64_t flags) + uint64_t op_flags) { struct smr_ep *ep; struct smr_region *peer_smr; @@ -332,6 +333,7 @@ static ssize_t smr_generic_rma_inject( struct smr_cmd *cmd; struct smr_cmd_entry *ce; int64_t pos; + uint8_t smr_flags; assert(len <= SMR_INJECT_SIZE); ep = container_of(ep_fid, struct smr_ep, util_ep.ep_fid.fid); @@ -359,36 +361,21 @@ static ssize_t smr_generic_rma_inject( goto unlock; } - if (len <= SMR_MSG_DATA_LEN) { - proto = smr_proto_inline; - cmd = &ce->cmd; - } else { - proto = smr_proto_inject; - if (smr_freestack_isempty(smr_cmd_stack(ep->region))) { - smr_cmd_queue_discard(ce, pos); - ret = -FI_EAGAIN; - goto unlock; - } - - cmd = smr_freestack_pop(smr_cmd_stack(ep->region)); - assert(cmd); - ce->ptr = smr_local_to_peer(ep, peer_smr, tx_id, rx_id, - (uintptr_t) cmd); - } + proto = len <= SMR_MSG_DATA_LEN ? smr_proto_inline : smr_proto_inject; + cmd = &ce->cmd; + smr_flags = (op_flags & FI_REMOTE_CQ_DATA) ? SMR_REMOTE_CQ_DATA : 0; ret = smr_send_ops[proto](ep, peer_smr, tx_id, rx_id, ofi_op_write, 0, - data, flags, NULL, &iov, 1, len, NULL, cmd); + data, op_flags, smr_flags, NULL, &iov, 1, len, + NULL, cmd); if (ret) { - if (proto != smr_proto_inline) - smr_freestack_push(smr_cmd_stack(ep->region), cmd); smr_cmd_queue_discard(ce, pos); goto unlock; } smr_add_rma_cmd(peer_smr, &rma_iov, 1, cmd); smr_cmd_queue_commit(ce, pos); - if (proto == smr_proto_inline) - ofi_ep_peer_tx_cntr_inc(&ep->util_ep, ofi_op_write); + ofi_ep_peer_tx_cntr_inc(&ep->util_ep, ofi_op_write); unlock: ofi_genlock_unlock(&ep->util_ep.lock); return ret; diff --git a/prov/shm/src/smr_util.c b/prov/shm/src/smr_util.c index 71340bdf1a8..0c3feeae688 100644 --- a/prov/shm/src/smr_util.c +++ b/prov/shm/src/smr_util.c @@ -37,6 +37,7 @@ struct dlist_entry ep_name_list; DEFINE_LIST(ep_name_list); pthread_mutex_t ep_list_lock = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t inject_pool_lock = PTHREAD_MUTEX_INITIALIZER; void smr_cleanup(void) { @@ -102,13 +103,13 @@ size_t smr_calculate_size_offsets(size_t tx_count, size_t rx_count, /* Align cmd_queue offset to cache line */ cmd_queue_offset = ofi_get_aligned_size(sizeof(struct smr_region), 64); - cmd_stack_offset = cmd_queue_offset + sizeof(struct smr_cmd_queue) + - sizeof(struct smr_cmd_queue_entry) * rx_size; - inject_pool_offset = cmd_stack_offset + - freestack_size(sizeof(struct smr_cmd), tx_size); - ret_queue_offset = inject_pool_offset + sizeof(struct smr_inject_buf) * - tx_size; - ret_queue_offset = ofi_get_aligned_size(ret_queue_offset, 64); + inject_pool_offset = cmd_queue_offset + sizeof(struct smr_cmd_queue) + + sizeof(struct smr_cmd_queue_entry) * rx_size; + cmd_stack_offset = inject_pool_offset + + freestack_size(sizeof(struct smr_inject_buf), + rx_size); + ret_queue_offset = ofi_get_aligned_size(cmd_stack_offset + + freestack_size(sizeof(struct smr_cmd), tx_size), 64); sar_pool_offset = ret_queue_offset + sizeof(struct smr_return_queue) + sizeof(struct smr_return_queue_entry) * tx_size; peer_data_offset = sar_pool_offset + @@ -287,6 +288,8 @@ int smr_create(const struct fi_provider *prov, const struct smr_attr *attr, (*smr)->max_sar_buf_per_peer = SMR_BUF_BATCH_MAX; smr_cmd_queue_init(smr_cmd_queue(*smr), rx_size, smr_cmd_init); + smr_freestack_init(smr_inject_pool(*smr), rx_size, + sizeof(struct smr_inject_buf)); smr_return_queue_init(smr_return_queue(*smr), tx_size, NULL); smr_freestack_init(smr_cmd_stack(*smr), tx_size, @@ -300,6 +303,7 @@ int smr_create(const struct fi_provider *prov, const struct smr_attr *attr, smr_peer_data(*smr)[i].xpmem.avail = false; } + ofi_spin_init(&(*smr)->fs_lock); strncpy((char *) smr_name(*smr), attr->name, total_size - name_offset); /* Must be set last to signal full initialization to peers */ @@ -318,6 +322,10 @@ int smr_create(const struct fi_provider *prov, const struct smr_attr *attr, void smr_free(struct smr_region *smr) { + /* + * TODO: Figure out if we should cleanup resources in the smr before + * unlinking and unmapping. + */ if (smr->flags & SMR_FLAG_HMEM_ENABLED) (void) ofi_hmem_host_unregister(smr); shm_unlink(smr_name(smr)); diff --git a/prov/shm/src/smr_util.h b/prov/shm/src/smr_util.h index 12d6a0af871..ffe6efabb69 100644 --- a/prov/shm/src/smr_util.h +++ b/prov/shm/src/smr_util.h @@ -35,8 +35,8 @@ #include "ofi.h" #include "ofi_atomic_queue.h" +#include "ofi_lock.h" #include "ofi_xpmem.h" -#include #ifdef __cplusplus extern "C" { @@ -74,6 +74,8 @@ extern struct smr_env smr_env; #define SMR_REMOTE_CQ_DATA (1 << 0) #define SMR_BUFFER_RECV (1 << 1) +#define SMR_RETURN_CMD (1 << 2) +#define SMR_OP_ERROR (1 << 3) enum { smr_proto_inline, /* inline payload */ @@ -86,27 +88,36 @@ enum { /* * Unique smr_cmd_hdr for smr message protocol: - * entry for internal use managing commands (must be kept first) - * tx_ctx source side context (unused by target side) - * rx_ctx target side context (unused by source side) - * tx_id local shm_id of peer sending msg (unused by target) - * rx_id remote shm_id of peer sending msg (unused by source) * op type of op (ex. ofi_op_msg, defined in ofi_proto.h) * proto smr protocol (ex. smr_proto_inline, defined above) - * op_flags operation flags (ex. SMR_REMOTE_CQ_DATA, defined above) + * smr_flags operation flags (ex. SMR_REMOTE_CQ_DATA, defined above) + * resv reserved + * tx_id local shm_id of peer sending msg (unused by target) + * rx_id remote shm_id of peer sending msg (unused by source) * size size of data transfer - * status returned status of operation * cq_data remote CQ data * tag tag for FI_TAGGED API only * datatype atomic datatype for FI_ATOMIC API only * atomic_op atomic operation for FI_ATOMIC API only + * status returned status of operation + * CACHE LINE HERE Make sure above fields are 40bytes so that they are + * properly cache aligned with the other 24 bytes + * from the atomic queue fields. This is necessary + (especially if your cpu does not have prefetching) so + that the first cache grab gets the lightweight protocol + fields. + * entry for internal use managing commands + * tx_ctx source side context (unused by target side) + * rx_ctx target side context (unused by source side) */ struct smr_cmd_hdr { - uint64_t entry; - uint64_t tx_ctx; - uint64_t rx_ctx; + uint8_t op; + uint8_t proto; + uint8_t smr_flags; + uint8_t resv[1]; + int16_t rx_id; + int16_t tx_id; uint64_t size; - int64_t status; uint64_t cq_data; union { uint64_t tag; @@ -115,12 +126,11 @@ struct smr_cmd_hdr { uint8_t atomic_op; }; }; - int16_t rx_id; - int16_t tx_id; - uint8_t op; - uint8_t proto; - uint8_t op_flags; - uint8_t resv[1]; + uint64_t proto_data; + //CACHE LINE HERE - See comment above + uint64_t entry; + uint64_t tx_ctx; + uint64_t rx_ctx; }; #ifdef static_assert @@ -145,6 +155,7 @@ struct smr_cmd_rma { struct smr_cmd_data { union { uint8_t msg[SMR_MSG_DATA_LEN]; + uint64_t inject_buf_index; struct { size_t iov_count; struct iovec iov[SMR_IOV_LIMIT]; @@ -244,6 +255,8 @@ struct smr_region { uintptr_t base_addr; size_t total_size; + + ofi_spin_t fs_lock; }; uint8_t pad[SMR_PREFETCH_SZ]; }; @@ -251,8 +264,8 @@ struct smr_region { struct { /* offsets from start of smr_region */ size_t cmd_queue_offset; - size_t cmd_stack_offset; size_t inject_pool_offset; + size_t cmd_stack_offset; size_t ret_queue_offset; size_t sar_pool_offset; size_t peer_data_offset; @@ -314,9 +327,9 @@ static inline struct smr_freestack *smr_cmd_stack(struct smr_region *smr) { return (struct smr_freestack *) ((char *) smr + smr->cmd_stack_offset); } -static inline struct smr_inject_buf *smr_inject_pool(struct smr_region *smr) +static inline struct smr_freestack *smr_inject_pool(struct smr_region *smr) { - return (struct smr_inject_buf *) + return (struct smr_freestack *) ((char *) smr + smr->inject_pool_offset); } static inline struct smr_return_queue *smr_return_queue(struct smr_region *smr) @@ -337,11 +350,32 @@ static inline const char *smr_name(struct smr_region *smr) return (const char *) smr + smr->name_offset; } -static inline struct smr_inject_buf *smr_get_inject_buf(struct smr_region *smr, - struct smr_cmd *cmd) +static inline struct smr_inject_buf *smr_get_inject_buf(struct smr_region *smr) +{ + struct smr_inject_buf *buf; + ofi_spin_lock(&smr->fs_lock); + if (!smr_freestack_isempty(smr_inject_pool(smr))) + buf = smr_freestack_pop(smr_inject_pool(smr)); + else + buf = NULL; + ofi_spin_unlock(&smr->fs_lock); + return buf; +} + +static inline void smr_return_inject_buf(struct smr_region *smr, + struct smr_inject_buf *buf) +{ + ofi_spin_lock(&smr->fs_lock); + smr_freestack_push(smr_inject_pool(smr), buf); + ofi_spin_unlock(&smr->fs_lock); +} + +static inline void smr_return_sar_buf_by_index(struct smr_region *smr, + size_t index) { - return &smr_inject_pool(smr)[smr_freestack_get_index(smr_cmd_stack(smr), - (char *) cmd)]; + ofi_spin_lock(&smr->fs_lock); + smr_freestack_push_by_index(smr_sar_pool(smr), index); + ofi_spin_unlock(&smr->fs_lock); } struct smr_attr {