Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 79 additions & 42 deletions lib/thread/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ struct spdk_thread {
struct spdk_ring *messages;
uint8_t num_pp_handlers;
int msg_fd;
SLIST_HEAD(, spdk_msg) msg_cache;
size_t msg_cache_count;
/* Lock-free message pool using SPDK rings (MP/SC) */
struct spdk_ring *msg_pool; /* Per-thread pool for alloc and free */
uint32_t msg_pool_size; /* Pool capacity (SPDK_MSG_MEMPOOL_CACHE_SIZE) */
spdk_msg_fn critical_msg;
uint64_t id;
uint64_t next_poller_id;
Expand Down Expand Up @@ -398,11 +399,25 @@ _thread_lib_init(size_t ctx_sz, size_t msg_mempool_sz)
static void thread_interrupt_destroy(struct spdk_thread *thread);
static int thread_interrupt_create(struct spdk_thread *thread);

static void
thread_drain_msg_ring(struct spdk_ring **ring)
{
struct spdk_msg *msg;

if (*ring == NULL) {
return;
}
while (spdk_ring_dequeue(*ring, (void **)&msg, 1) == 1) {
spdk_mempool_put(g_spdk_msg_mempool, msg);
}
spdk_ring_free(*ring);
*ring = NULL;
}

static void
_free_thread(struct spdk_thread *thread)
{
struct spdk_io_channel *ch;
struct spdk_msg *msg;
struct spdk_poller *poller, *ptmp;

RB_FOREACH(ch, io_channel_tree, &thread->io_channels) {
Expand Down Expand Up @@ -440,18 +455,7 @@ _free_thread(struct spdk_thread *thread)
TAILQ_REMOVE(&g_threads, thread, tailq);
pthread_mutex_unlock(&g_devlist_mutex);

msg = SLIST_FIRST(&thread->msg_cache);
while (msg != NULL) {
SLIST_REMOVE_HEAD(&thread->msg_cache, link);

assert(thread->msg_cache_count > 0);
thread->msg_cache_count--;
spdk_mempool_put(g_spdk_msg_mempool, msg);

msg = SLIST_FIRST(&thread->msg_cache);
}

assert(thread->msg_cache_count == 0);
thread_drain_msg_ring(&thread->msg_pool);

if (spdk_interrupt_mode_is_enabled()) {
thread_interrupt_destroy(thread);
Expand Down Expand Up @@ -551,8 +555,10 @@ spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask)
TAILQ_INIT(&thread->active_pollers);
RB_INIT(&thread->timed_pollers);
TAILQ_INIT(&thread->paused_pollers);
SLIST_INIT(&thread->msg_cache);
thread->msg_cache_count = 0;

/* Initialize message pool fields */
thread->msg_pool = NULL;
thread->msg_pool_size = SPDK_MSG_MEMPOOL_CACHE_SIZE;

thread->tsc_last = spdk_get_ticks();

Expand All @@ -568,14 +574,30 @@ spdk_thread_create(const char *name, const struct spdk_cpuset *cpumask)
return NULL;
}

/* Fill the local message pool cache. */
/* Create per-thread message pool (MP/SC: any thread may return messages here) */
thread->msg_pool = spdk_ring_create(SPDK_RING_TYPE_MP_SC, SPDK_MSG_MEMPOOL_CACHE_SIZE,
SPDK_ENV_NUMA_ID_ANY);
if (!thread->msg_pool) {
SPDK_ERRLOG("Unable to allocate memory for message pool ring\n");
spdk_ring_free(thread->messages);
free(thread);
return NULL;
}

/* Pre-populate the local message pool from global mempool */
rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
if (rc == 0) {
/* If we can't populate the cache it's ok. The cache will get filled
* up organically as messages are passed to the thread. */
/* Enqueue all messages into the thread-local pool */
for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
thread->msg_cache_count++;
rc = spdk_ring_enqueue(thread->msg_pool, (void **)&msgs[i], 1, NULL);
if (rc != 1) {
SPDK_WARNLOG("Failed to enqueue message %d to pool\n", i);
/* Return unused messages to global pool */
for (; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
spdk_mempool_put(g_spdk_msg_mempool, msgs[i]);
}
break;
}
}
}

Expand Down Expand Up @@ -853,6 +875,8 @@ spdk_thread_get_from_ctx(void *ctx)
return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
}

static void _thread_free_msg(struct spdk_thread *thread, struct spdk_msg *msg);

static inline uint32_t
msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
{
Expand Down Expand Up @@ -899,14 +923,8 @@ msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)

SPIN_ASSERT(thread->lock_count == 0, SPIN_ERR_HOLD_DURING_SWITCH);

if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
/* Insert the messages at the head. We want to re-use the hot
* ones. */
SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
thread->msg_cache_count++;
} else {
spdk_mempool_put(g_spdk_msg_mempool, msg);
}
/* Return message to pool */
_thread_free_msg(thread, msg);
}

return count;
Expand Down Expand Up @@ -1411,6 +1429,26 @@ thread_send_msg_notification(const struct spdk_thread *target_thread)
}
}
}
/**
* Free a message back to its originating thread's pool.
* MP/SC ring allows any thread to enqueue freed messages directly.
*/
static void
_thread_free_msg(struct spdk_thread *thread, struct spdk_msg *msg)
{
int rc;

if (thread->msg_pool) {
rc = spdk_ring_enqueue(thread->msg_pool, (void **)&msg, 1, NULL);
if (rc == 1) {
return;
}
}

/* SLOW PATH: Return to global pool (rare - only when ring is full) */
spdk_mempool_put(g_spdk_msg_mempool, msg);
}


int
spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
Expand All @@ -1429,23 +1467,22 @@ spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx
local_thread = _get_thread();

msg = NULL;
if (local_thread != NULL) {
if (local_thread->msg_cache_count > 0) {
msg = SLIST_FIRST(&local_thread->msg_cache);
assert(msg != NULL);
SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
local_thread->msg_cache_count--;
if (local_thread != NULL && local_thread->msg_pool) {
/* FAST PATH: Try thread-local pool */
if (spdk_ring_dequeue(local_thread->msg_pool, (void **)&msg, 1) == 1) {
goto got_msg;
}
}

if (msg == NULL) {
msg = spdk_mempool_get(g_spdk_msg_mempool);
if (!msg) {
SPDK_ERRLOG("msg could not be allocated\n");
abort();
}
/* SLOW PATH: Allocate from global pool (rare - only when local pool exhausted) */
msg = spdk_mempool_get(g_spdk_msg_mempool);
if (!msg) {
SPDK_ERRLOG("msg could not be allocated\n");
abort();
}

got_msg:

msg->fn = fn;
msg->arg = ctx;

Expand Down
2 changes: 1 addition & 1 deletion mk/spdk.common.mk
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ LDFLAGS += -fsanitize=fuzzer-no-link
SYS_LIBS += $(CONFIG_FUZZER_LIB)
endif

SPDK_GIT_COMMIT := c19d07f46274d31201c0c4db3775a655d68f5f38
SPDK_GIT_COMMIT := da93f4cb13afa0fbf8df26c9c5d0ed1c8ab0aef6
ifneq (, $(SPDK_GIT_COMMIT))
COMMON_CFLAGS += -DSPDK_GIT_COMMIT=$(SPDK_GIT_COMMIT)
endif
Expand Down
2 changes: 1 addition & 1 deletion mk/spdk.modules.mk
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ endif

ifeq ($(CONFIG_RBD),y)
BLOCKDEV_MODULES_LIST += bdev_rbd
BLOCKDEV_MODULES_PRIVATE_LIBS += -lrados -lrbd
BLOCKDEV_MODULES_PRIVATE_LIBS += -lrados -lrbd -lstdc++
endif

ifeq ($(CONFIG_DAOS),y)
Expand Down
1 change: 1 addition & 0 deletions module/bdev/rbd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ SO_VER := 8
SO_MINOR := 0

C_SRCS = bdev_rbd.c bdev_rbd_rpc.c
CXX_SRCS = bdev_rbd_spdk_context_wq.cpp
LIBNAME = bdev_rbd

SPDK_MAP_FILE = $(SPDK_ROOT_DIR)/mk/spdk_blank.map
Expand Down
47 changes: 45 additions & 2 deletions module/bdev/rbd/bdev_rbd.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "spdk/stdinc.h"

#include "bdev_rbd.h"
#include "bdev_rbd_spdk_context_wq.h"

#include <rbd/librbd.h>
#include <rados/librados.h>
Expand All @@ -28,6 +29,7 @@ static int bdev_rbd_count = 0;
* global parameter to control CRC32C usage in RBD write operations.
*/
static bool g_rbd_with_crc32c = false;
static bool g_rbd_with_spdk_wq = false;

struct bdev_rbd_pool_ctx {
rados_t *cluster_p;
Expand Down Expand Up @@ -78,6 +80,9 @@ struct bdev_rbd {
int (*reservation_fn_cbk)(void *ns);
char cluster_fsid[37];

/* SPDK ContextWQ for this bdev */
struct bdev_rbd_spdk_context_wq *spdk_context_wq;

};

struct bdev_rbd_io_channel {
Expand Down Expand Up @@ -229,6 +234,16 @@ bdev_rbd_free(struct bdev_rbd *rbd)
rbd_close(rbd->image);
}

/* Clean up SPDK ContextWQ after RBD image is closed.
* This ensures no new I/O completions can occur after ContextWQ is destroyed.
* The drain() function in the destructor will wait for any pending messages
* to complete before the ContextWQ is actually destroyed.
*/
if (rbd->spdk_context_wq != NULL) {
bdev_rbd_spdk_context_wq_destroy(rbd->spdk_context_wq);
rbd->spdk_context_wq = NULL;
}

free(rbd->disk.name);
free(rbd->rbd_name);
free(rbd->user_id);
Expand Down Expand Up @@ -563,11 +578,26 @@ bdev_rbd_init_context(void *arg)
}

assert(io_ctx != NULL);
if (g_rbd_with_spdk_wq) {
/* Find reactor thread, create SpdkContextWQ if available, then open with context_wq (NULL uses default AsioContextWQ) */
struct spdk_thread *reactor_thread = bdev_rbd_find_reactor_thread();
rbd->spdk_context_wq = bdev_rbd_spdk_context_wq_create_from_ioctx(
*io_ctx, reactor_thread);
if (rbd->spdk_context_wq == NULL) {
SPDK_NOTICELOG("rbd_with_spdk_wq is enabled but SpdkContextWQ was not created (no reactor thread or allocation failure); "
"falling back to default AsioContextWQ for RBD image %s/%s\n",
rbd->pool_name, rbd->rbd_name);
}
} else {
rbd->spdk_context_wq = NULL;
SPDK_NOTICELOG("rbd_with_spdk_wq is disabled, using AsioContextWQ for RBD image %s/%s\n",
rbd->pool_name, rbd->rbd_name);
}
if (rbd->rbd_read_only) {
SPDK_DEBUGLOG(bdev_rbd, "Will open RBD image %s/%s as read-only\n", rbd->pool_name, rbd->rbd_name);
rc = rbd_open_read_only(*io_ctx, rbd->rbd_name, &rbd->image, NULL);
rc = rbd_open_read_only_with_context_wq(*io_ctx, rbd->rbd_name, &rbd->image, NULL, rbd->spdk_context_wq);
} else {
rc = rbd_open(*io_ctx, rbd->rbd_name, &rbd->image, NULL);
rc = rbd_open_with_context_wq(*io_ctx, rbd->rbd_name, &rbd->image, NULL, rbd->spdk_context_wq);
}
if (rc < 0) {
SPDK_ERRLOG("Failed to open specified rbd device\n");
Expand Down Expand Up @@ -2111,3 +2141,16 @@ bdev_rbd_set_with_crc32c(bool enable)
{
g_rbd_with_crc32c = enable;
}

bool
bdev_rbd_get_with_spdk_wq(void)
{
return g_rbd_with_spdk_wq;
}

/** enable or disable SPDK ContextWQ for RBD operations */
void
bdev_rbd_set_with_spdk_wq(bool enable)
{
g_rbd_with_spdk_wq = enable;
}
15 changes: 15 additions & 0 deletions module/bdev/rbd/bdev_rbd.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,19 @@ bool bdev_rbd_get_with_crc32c(void);
*/
void bdev_rbd_set_with_crc32c(bool enable);

/**
* Get the current rbd_with_spdk_wq setting.
*
* \return true if SPDK ContextWQ is enabled, false otherwise
*/
bool bdev_rbd_get_with_spdk_wq(void);

/**
* Set the rbd_with_spdk_wq parameter to enable/disable SPDK ContextWQ
* for RBD operations.
*
* \param enable true to enable SPDK ContextWQ, false to disable (uses AsioContextWQ)
*/
void bdev_rbd_set_with_spdk_wq(bool enable);

#endif /* SPDK_BDEV_RBD_H */
50 changes: 50 additions & 0 deletions module/bdev/rbd/bdev_rbd_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,53 @@ SPDK_RPC_REGISTER("bdev_rbd_wait_for_latest_osdmap", rpc_bdev_rbd_wait_for_lates

SPDK_RPC_REGISTER("bdev_rbd_get_with_crc32c", rpc_bdev_rbd_get_with_crc32c, SPDK_RPC_RUNTIME)
SPDK_RPC_REGISTER("bdev_rbd_set_with_crc32c", rpc_bdev_rbd_set_with_crc32c, SPDK_RPC_STARTUP)

/**
* RPC function to get the current rbd_with_spdk_wq setting
*/
static void
rpc_bdev_rbd_get_with_spdk_wq(struct spdk_jsonrpc_request *request,
const struct spdk_json_val *params)
{
struct spdk_json_write_ctx *w;

w = spdk_jsonrpc_begin_result(request);
spdk_json_write_bool(w, bdev_rbd_get_with_spdk_wq());
spdk_jsonrpc_end_result(request, w);
}

/**
* RPC function to set the rbd_with_spdk_wq parameter
*/
struct rpc_bdev_rbd_set_with_spdk_wq {
bool enable;
};

static const struct spdk_json_object_decoder rpc_bdev_rbd_set_with_spdk_wq_decoders[] = {
{"enable", offsetof(struct rpc_bdev_rbd_set_with_spdk_wq, enable), spdk_json_decode_bool},
};

static void
rpc_bdev_rbd_set_with_spdk_wq(struct spdk_jsonrpc_request *request,
const struct spdk_json_val *params)
{
struct rpc_bdev_rbd_set_with_spdk_wq req = {};
struct spdk_json_write_ctx *w;

if (spdk_json_decode_object(params, rpc_bdev_rbd_set_with_spdk_wq_decoders,
SPDK_COUNTOF(rpc_bdev_rbd_set_with_spdk_wq_decoders),
&req)) {
spdk_jsonrpc_send_error_response(request, SPDK_JSONRPC_ERROR_INVALID_PARAMS,
"Missing or invalid enable parameter");
return;
}

bdev_rbd_set_with_spdk_wq(req.enable);

w = spdk_jsonrpc_begin_result(request);
spdk_json_write_bool(w, bdev_rbd_get_with_spdk_wq());
spdk_jsonrpc_end_result(request, w);
}

SPDK_RPC_REGISTER("bdev_rbd_get_with_spdk_wq", rpc_bdev_rbd_get_with_spdk_wq, SPDK_RPC_RUNTIME)
SPDK_RPC_REGISTER("bdev_rbd_set_with_spdk_wq", rpc_bdev_rbd_set_with_spdk_wq, SPDK_RPC_STARTUP)
Loading