Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions prov/shm/src/smr.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ struct smr_ep {
struct slist overflow_list;
struct dlist_entry sar_list;
struct dlist_entry async_cpy_list;
struct dlist_entry unexp_cmd_list;
size_t min_multi_recv_size;

int ep_idx;
Expand Down Expand Up @@ -256,20 +255,21 @@ void smr_format_tx_pend(struct smr_pend_entry *pend, struct smr_cmd *cmd,
uint64_t op_flags);
void smr_generic_format(struct smr_cmd *cmd, int64_t tx_id, int64_t rx_id,
uint32_t op, uint64_t tag, uint64_t data,
uint64_t op_flags);
uint8_t smr_flags);
size_t smr_copy_to_sar(struct smr_ep *ep, struct smr_region *smr,
struct smr_pend_entry *pend);
size_t smr_copy_from_sar(struct smr_ep *ep, struct smr_region *smr,
struct smr_pend_entry *pend);
int smr_select_proto(void **desc, size_t iov_count, bool cma_avail,
bool ipc_valid, uint32_t op, uint64_t total_len,
uint64_t op_flags);
uint64_t op_flags, uint8_t *smr_flags);
typedef ssize_t (*smr_send_func)(
struct smr_ep *ep, struct smr_region *peer_smr,
int64_t tx_id, int64_t rx_id, uint32_t op, uint64_t tag,
uint64_t data, uint64_t op_flags, struct ofi_mr **desc,
const struct iovec *iov, size_t iov_count, size_t total_len,
void *context, struct smr_cmd *cmd);
uint64_t data, uint64_t op_flags, uint8_t smr_flags,
struct ofi_mr **desc, const struct iovec *iov,
size_t iov_count, size_t total_len, void *context,
struct smr_cmd *cmd);
extern smr_send_func smr_send_ops[smr_proto_max];

int smr_write_err_comp(struct util_cq *cq, void *context,
Expand All @@ -280,9 +280,9 @@ int smr_complete_rx(struct smr_ep *ep, void *context, uint32_t op,
uint64_t flags, size_t len, void *buf, int64_t id,
uint64_t tag, uint64_t data);

static inline uint64_t smr_rx_cq_flags(uint64_t rx_flags, uint16_t op_flags)
static inline uint64_t smr_rx_cq_flags(uint64_t rx_flags, uint8_t smr_flags)
{
if (op_flags & SMR_REMOTE_CQ_DATA)
if (smr_flags & SMR_REMOTE_CQ_DATA)
rx_flags |= FI_REMOTE_CQ_DATA;
return rx_flags;
}
Expand Down
105 changes: 56 additions & 49 deletions prov/shm/src/smr_atomic.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ static void smr_do_atomic_inline(
smr_format_inline_atomic(cmd, desc, iov, iov_count);
}

static void smr_format_inject_atomic(
static int smr_format_inject_atomic(
struct smr_cmd *cmd, struct ofi_mr **desc,
const struct iovec *iov, size_t count,
const struct iovec *resultv, size_t result_count,
Expand All @@ -80,8 +80,17 @@ static void smr_format_inject_atomic(
size_t comp_size;

cmd->hdr.proto = smr_proto_inject;
tx_buf = smr_get_inject_buf(smr);
if (!tx_buf) {
FI_DBG(&smr_prov, FI_LOG_EP_DATA,
"No inject buffers available, cannot send inject "
"message\n");
return -FI_EAGAIN;
}

tx_buf = smr_get_inject_buf(smr, cmd);
cmd->data.inject_buf_index = smr_freestack_get_index(
smr_inject_pool(smr),
(char *)tx_buf);
switch (cmd->hdr.op) {
case ofi_op_atomic:
cmd->hdr.size = ofi_copy_from_mr_iov(
Expand Down Expand Up @@ -112,9 +121,11 @@ static void smr_format_inject_atomic(
assert(0);
break;
}

return FI_SUCCESS;
}

static ssize_t smr_do_atomic_inject(
static int smr_do_atomic_inject(
struct smr_ep *ep, struct smr_region *peer_smr,
int64_t tx_id, int64_t rx_id, uint32_t op,
uint64_t op_flags, uint8_t datatype, uint8_t atomic_op,
Expand All @@ -123,18 +134,20 @@ static ssize_t smr_do_atomic_inject(
const struct iovec *resultv, size_t result_count,
struct ofi_mr **comp_desc, const struct iovec *compv,
size_t comp_count, size_t total_len, void *context,
uint16_t smr_flags, struct smr_cmd *cmd)
uint8_t smr_flags, struct smr_cmd *cmd)
{
struct smr_pend_entry *pend;
int ret;

smr_generic_format(cmd, tx_id, rx_id, op, 0, 0, op_flags);
smr_generic_format(cmd, tx_id, rx_id, op, 0, 0, smr_flags);
smr_generic_atomic_format(cmd, datatype, atomic_op);
smr_format_inject_atomic(cmd, desc, iov, iov_count, resultv,
result_count, comp_desc, compv, comp_count,
ep->region);
ret = smr_format_inject_atomic(cmd, desc, iov, iov_count, resultv,
result_count, comp_desc, compv,
comp_count, peer_smr);
if (ret)
return ret;

if (op == ofi_op_atomic_fetch || op == ofi_op_atomic_compare ||
atomic_op == FI_ATOMIC_READ || op_flags & FI_DELIVERY_COMPLETE) {
if (smr_flags & SMR_RETURN_CMD) {
pend = ofi_buf_alloc(ep->pend_pool);
assert(pend);
cmd->hdr.tx_ctx = (uintptr_t) pend;
Expand All @@ -148,13 +161,18 @@ static ssize_t smr_do_atomic_inject(
}

static int smr_select_atomic_proto(uint32_t op, uint64_t total_len,
uint64_t op_flags)
uint64_t op_flags, uint8_t *smr_flags)
{
*smr_flags = 0;
assert(!(op_flags & FI_REMOTE_CQ_DATA));
if (op == ofi_op_atomic_compare || op == ofi_op_atomic_fetch ||
op_flags & FI_DELIVERY_COMPLETE || total_len > SMR_MSG_DATA_LEN)
op_flags & FI_DELIVERY_COMPLETE) {
*smr_flags |= SMR_RETURN_CMD;
return smr_proto_inject;
}

return smr_proto_inline;
return total_len > SMR_MSG_DATA_LEN ? smr_proto_inject :
smr_proto_inline;
}

static ssize_t smr_generic_atomic(
Expand All @@ -174,12 +192,12 @@ static ssize_t smr_generic_atomic(
struct iovec iov[SMR_IOV_LIMIT];
struct iovec compare_iov[SMR_IOV_LIMIT];
struct iovec result_iov[SMR_IOV_LIMIT];
uint16_t smr_flags = 0;
int64_t tx_id, rx_id, pos;
int proto;
ssize_t ret;
size_t total_len;
uint64_t atomic_flags;
uint8_t smr_flags;

assert(count <= SMR_IOV_LIMIT);
assert(result_count <= SMR_IOV_LIMIT);
Expand Down Expand Up @@ -248,15 +266,8 @@ static ssize_t smr_generic_atomic(
break;
}

proto = smr_select_atomic_proto(op, total_len, op_flags);

if (proto == smr_proto_inline) {
cmd = &ce->cmd;
smr_do_atomic_inline(ep, peer_smr, tx_id, rx_id, ofi_op_atomic,
op_flags, datatype, atomic_op,
(struct ofi_mr **) desc, iov, count,
total_len, cmd);
} else {
proto = smr_select_atomic_proto(op, total_len, op_flags, &smr_flags);
if (smr_flags & SMR_RETURN_CMD) {
if (smr_freestack_isempty(smr_cmd_stack(ep->region))) {
smr_cmd_queue_discard(ce, pos);
ret = -FI_EAGAIN;
Expand All @@ -266,7 +277,17 @@ static ssize_t smr_generic_atomic(
cmd = smr_freestack_pop(smr_cmd_stack(ep->region));
assert(cmd);
ce->ptr = smr_local_to_peer(ep, peer_smr, tx_id, rx_id,
(uintptr_t) cmd);
(uintptr_t) cmd);
} else {
cmd = &ce->cmd;
}

if (proto == smr_proto_inline) {
smr_do_atomic_inline(ep, peer_smr, tx_id, rx_id, ofi_op_atomic,
op_flags, datatype, atomic_op,
(struct ofi_mr **) desc, iov, count,
total_len, cmd);
} else {
ret = smr_do_atomic_inject(ep, peer_smr, tx_id, rx_id, op,
op_flags, datatype, atomic_op,
(struct ofi_mr **) desc, iov, count,
Expand All @@ -281,16 +302,17 @@ static ssize_t smr_generic_atomic(
}
}

if (!cmd->hdr.tx_ctx) {
ret = smr_complete_tx(ep, context, op, op_flags);
if (ret) {
FI_WARN(&smr_prov, FI_LOG_EP_CTRL,
"unable to process tx completion\n");
}
}

smr_format_rma_ioc(cmd, rma_ioc, rma_count);
smr_cmd_queue_commit(ce, pos);

if (smr_flags & SMR_RETURN_CMD)
goto unlock;

ret = smr_complete_tx(ep, context, op, op_flags);
if (ret)
FI_WARN(&smr_prov, FI_LOG_EP_CTRL,
"unable to process tx completion\n");

unlock:
ofi_genlock_unlock(&ep->util_ep.lock);
return ret;
Expand Down Expand Up @@ -371,7 +393,6 @@ static ssize_t smr_atomic_inject(
int64_t id, peer_id, pos;
ssize_t ret;
size_t total_len;
int proto;

ep = container_of(ep_fid, struct smr_ep, util_ep.ep_fid.fid);

Expand Down Expand Up @@ -412,24 +433,12 @@ static ssize_t smr_atomic_inject(
rma_ioc.count = count;
rma_ioc.key = key;

cmd = &ce->cmd;
if (total_len <= SMR_MSG_DATA_LEN) {
proto = smr_proto_inline;
cmd = &ce->cmd;
smr_do_atomic_inline(ep, peer_smr, id, peer_id, ofi_op_atomic,
0, datatype, op, NULL, &iov, 1, total_len,
&ce->cmd);
} else {
proto = smr_proto_inject;
if (smr_freestack_isempty(smr_cmd_stack(ep->region))) {
smr_cmd_queue_discard(ce, pos);
ret = -FI_EAGAIN;
goto unlock;
}

cmd = smr_freestack_pop(smr_cmd_stack(ep->region));
assert(cmd);
ce->ptr = smr_local_to_peer(ep, peer_smr, id, peer_id,
(uintptr_t) cmd);
ret = smr_do_atomic_inject(ep, peer_smr, id, peer_id,
ofi_op_atomic, 0, datatype, op, NULL,
&iov, 1, NULL, NULL, 0, NULL, NULL,
Expand All @@ -442,9 +451,7 @@ static ssize_t smr_atomic_inject(

smr_format_rma_ioc(cmd, &rma_ioc, 1);
smr_cmd_queue_commit(ce, pos);

if (proto == smr_proto_inline)
ofi_ep_peer_tx_cntr_inc(&ep->util_ep, ofi_op_atomic);
ofi_ep_peer_tx_cntr_inc(&ep->util_ep, ofi_op_atomic);
unlock:
ofi_genlock_unlock(&ep->util_ep.lock);
return ret;
Expand Down
Loading
Loading