Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions prov/shm/src/smr.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@
#include "ofi_shm_p2p.h"
#include "ofi_util.h"

/* Per-peer sender-side cache of inject buffers. Avoids taking
* peer_smr->fs_lock on every inject send by popping SMR_INJECT_CACHE_BATCH
* buffers under a single lock acquisition and consuming them one at a time. */
#define SMR_INJECT_CACHE_BATCH 32

struct smr_inject_cache {
struct smr_inject_buf *bufs[SMR_INJECT_CACHE_BATCH];
int count;
};

struct smr_ep {
struct util_ep util_ep;
size_t tx_size;
Expand All @@ -61,6 +71,10 @@ struct smr_ep {
enum ofi_shm_p2p_type p2p_type;
void *dsa_context;
void (*smr_progress_async)(struct smr_ep *ep);

/* Placed at the end so hot fields earlier in the struct keep their
* original offsets / cacheline layout. */
struct smr_inject_cache inject_cache[SMR_MAX_PEERS];
};


Expand Down Expand Up @@ -250,6 +264,35 @@ int smr_cntr_open(struct fid_domain *domain, struct fi_cntr_attr *attr,

int64_t smr_verify_peer(struct smr_ep *ep, fi_addr_t fi_addr);

/* Get an inject buffer from our per-peer cache, refilling from the peer's
* freestack (under peer_smr->fs_lock) if the cache is empty. Returns NULL if
* the peer's freestack is exhausted. */
static inline struct smr_inject_buf *
smr_get_inject_buf_cached(struct smr_ep *ep, struct smr_region *peer_smr,
int64_t peer_id)
{
struct smr_inject_cache *cache = &ep->inject_cache[peer_id];
int i;

if (cache->count > 0)
return cache->bufs[--cache->count];

/* Cache empty: acquire peer's freestack lock once and batch-pop. */
ofi_spin_lock(&peer_smr->fs_lock);
for (i = 0; i < SMR_INJECT_CACHE_BATCH; i++) {
if (smr_freestack_isempty(smr_inject_pool(peer_smr)))
break;
cache->bufs[i] = smr_freestack_pop(smr_inject_pool(peer_smr));
}
ofi_spin_unlock(&peer_smr->fs_lock);

cache->count = i;
if (i == 0)
return NULL;
return cache->bufs[--cache->count];
}


void smr_format_tx_pend(struct smr_pend_entry *pend, struct smr_cmd *cmd,
void *context, struct ofi_mr **mr,
const struct iovec *iov, uint32_t iov_count,
Expand Down
37 changes: 32 additions & 5 deletions prov/shm/src/smr_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -271,24 +271,32 @@ static void smr_format_inline(struct smr_cmd *cmd, struct ofi_mr **mr,
}

static int smr_format_inject(struct smr_ep *ep, struct smr_region *peer_smr,
int64_t peer_id,
struct smr_cmd *cmd, struct ofi_mr **mr,
const struct iovec *iov, uint32_t iov_count,
uint64_t op_flags)
{
struct smr_inject_buf *tx_buf;

cmd->hdr.proto = smr_proto_inject;
tx_buf = smr_get_inject_buf(peer_smr);
tx_buf = smr_get_inject_buf_cached(ep, peer_smr, peer_id);
if (!tx_buf) {
FI_DBG(&smr_prov, FI_LOG_EP_DATA,
"No inject buffers available, cannot send inject "
"message\n");
return -FI_EAGAIN;
}

cmd->data.inject_buf_index = smr_freestack_get_index(
smr_inject_pool(peer_smr),
(char *)tx_buf);
/* sizeof(struct smr_inject_buf) == SMR_INJECT_SIZE == 4096 == 2^12
* is known at compile time, so compute the freestack index with a
* shift instead of calling smr_freestack_get_index() which uses a
* runtime divq (~20-30 cycles on x86, ~50+ cycles on Arm). */
{
struct smr_freestack *ifs = smr_inject_pool(peer_smr);
uint64_t offset = ((char *)tx_buf - (char *)ifs) -
ifs->entry_base_offset;
cmd->data.inject_buf_index = (int16_t)(offset >> 12);
}
if (cmd->hdr.op != ofi_op_read_req)
cmd->hdr.size = ofi_copy_from_mr_iov(tx_buf->data,
SMR_INJECT_SIZE,
Expand Down Expand Up @@ -505,7 +513,7 @@ static ssize_t smr_do_inject(struct smr_ep *ep, struct smr_region *peer_smr,

smr_generic_format(cmd, tx_id, rx_id, op, tag, data, smr_flags,
(uintptr_t) pend);
ret = smr_format_inject(ep, peer_smr, cmd, desc, iov, iov_count,
ret = smr_format_inject(ep, peer_smr, tx_id, cmd, desc, iov, iov_count,
op_flags);
if (ret)
goto err;
Expand Down Expand Up @@ -636,9 +644,28 @@ static int smr_ep_close(struct fid *fid)
struct smr_ep *ep;
struct smr_pend_entry *pend;
struct smr_cmd_ctx *cmd_ctx;
struct smr_inject_cache *cache;
struct smr_region *peer_smr;
int i;

ep = container_of(fid, struct smr_ep, util_ep.ep_fid.fid);

/* Return any cached inject buffers to their peers' freestacks */
for (i = 0; i < SMR_MAX_PEERS; i++) {
cache = &ep->inject_cache[i];
if (cache->count == 0)
continue;
if (!ep->map || !ep->map->peers[i].region)
continue;
peer_smr = ep->map->peers[i].region;
ofi_spin_lock(&peer_smr->fs_lock);
while (cache->count > 0) {
smr_freestack_push(smr_inject_pool(peer_smr),
cache->bufs[--cache->count]);
}
ofi_spin_unlock(&peer_smr->fs_lock);
}

if (smr_env.use_dsa_sar)
smr_dsa_context_cleanup(ep);

Expand Down