From a38a9dc4976dc40005efe082bee5392d3cb3c80a Mon Sep 17 00:00:00 2001 From: Yanfei Guo Date: Tue, 11 Nov 2025 10:33:26 -0600 Subject: [PATCH 1/2] ch4/posix: clean up eager payload limit calculation The reduction of MAX_ALIGNMENT is because iqueue eager module may internally alignment the data. Move the calculation into iqueue to make it clearer. --- src/mpid/ch4/shm/posix/eager/iqueue/iqueue_send.h | 5 ++++- src/mpid/ch4/shm/posix/posix_am.h | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_send.h b/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_send.h index 8cb1b6d6711..4104637652d 100644 --- a/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_send.h +++ b/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_send.h @@ -11,7 +11,10 @@ MPL_STATIC_INLINE_PREFIX size_t MPIDI_POSIX_eager_payload_limit(void) { - return MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_CELL_SIZE - sizeof(MPIDI_POSIX_eager_iqueue_cell_t); + /* reduce the eager payload limit by MAX_ALIGNMENT to account for alignment in + * MPIDI_POSIX_eager_send below */ + return MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_CELL_SIZE - sizeof(MPIDI_POSIX_eager_iqueue_cell_t) + - MAX_ALIGNMENT; } MPL_STATIC_INLINE_PREFIX size_t MPIDI_POSIX_eager_buf_limit(void) diff --git a/src/mpid/ch4/shm/posix/posix_am.h b/src/mpid/ch4/shm/posix/posix_am.h index a0f3d10ebe6..a0aeb6b08fd 100644 --- a/src/mpid/ch4/shm/posix/posix_am.h +++ b/src/mpid/ch4/shm/posix/posix_am.h @@ -13,7 +13,7 @@ MPL_STATIC_INLINE_PREFIX MPI_Aint MPIDI_POSIX_am_eager_limit(void) { - return MPIDI_POSIX_eager_payload_limit() - MAX_ALIGNMENT; + return MPIDI_POSIX_eager_payload_limit(); } MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_do_am_send_hdr(int grank, From 75ca98129ae52e8bfc9d482224c3982b051af448 Mon Sep 17 00:00:00 2001 From: Yanfei Guo Date: Tue, 11 Nov 2025 10:33:54 -0600 Subject: [PATCH 2/2] ch4/posix/iqueue: add queue pairs for tiny message Create ring buffer based fast boxes between each pair of local processes. The cell size of a ring buffer must be divisible by cache line size. The number of cells in a ring buffer must be power of two. --- .../ch4/shm/posix/eager/iqueue/Makefile.mk | 2 + .../ch4/shm/posix/eager/iqueue/iqueue_impl.h | 1 + .../ch4/shm/posix/eager/iqueue/iqueue_init.c | 87 +++++++++++- .../ch4/shm/posix/eager/iqueue/iqueue_qp.c | 65 +++++++++ .../ch4/shm/posix/eager/iqueue/iqueue_qp.h | 93 +++++++++++++ .../ch4/shm/posix/eager/iqueue/iqueue_recv.h | 45 ++++++- .../ch4/shm/posix/eager/iqueue/iqueue_send.h | 126 +++++++++++++++++- .../ch4/shm/posix/eager/iqueue/iqueue_types.h | 56 +++++++- src/mpid/ch4/src/mpidig_pt2pt_callbacks.c | 2 + src/mpid/common/genq/mpidu_genq_shmem_pool.h | 14 ++ src/mpl/include/mpl_base.h | 1 + 11 files changed, 475 insertions(+), 17 deletions(-) create mode 100644 src/mpid/ch4/shm/posix/eager/iqueue/iqueue_qp.c create mode 100644 src/mpid/ch4/shm/posix/eager/iqueue/iqueue_qp.h diff --git a/src/mpid/ch4/shm/posix/eager/iqueue/Makefile.mk b/src/mpid/ch4/shm/posix/eager/iqueue/Makefile.mk index cd5cc1ca969..4e5461db2ae 100644 --- a/src/mpid/ch4/shm/posix/eager/iqueue/Makefile.mk +++ b/src/mpid/ch4/shm/posix/eager/iqueue/Makefile.mk @@ -7,9 +7,11 @@ if BUILD_CH4_SHM_POSIX_EAGER_IQUEUE noinst_HEADERS += src/mpid/ch4/shm/posix/eager/iqueue/iqueue_send.h \ src/mpid/ch4/shm/posix/eager/iqueue/iqueue_recv.h \ + src/mpid/ch4/shm/posix/eager/iqueue/iqueue_qp.h \ src/mpid/ch4/shm/posix/eager/iqueue/posix_eager_inline.h mpi_core_sources += src/mpid/ch4/shm/posix/eager/iqueue/func_table.c \ + src/mpid/ch4/shm/posix/eager/iqueue/iqueue_qp.c \ src/mpid/ch4/shm/posix/eager/iqueue/iqueue_init.c endif diff --git a/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_impl.h b/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_impl.h index 3a9b78055ee..fcdf9f4f417 100644 --- a/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_impl.h +++ b/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_impl.h @@ -9,5 +9,6 @@ #include #include "mpidu_init_shm.h" #include "iqueue_types.h" +#include "iqueue_qp.h" #endif /* POSIX_EAGER_IQUEUE_IMPL_H_INCLUDED */ diff --git a/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_init.c b/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_init.c index 5283e9aac7d..5c4b2d49f80 100644 --- a/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_init.c +++ b/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_init.c @@ -31,6 +31,35 @@ description : >- Size of each cell. + - name : MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_NUM_CELLS + category : CH4 + type : int + default : 64 + class : none + verbosity : MPI_T_VERBOSITY_USER_BASIC + scope : MPI_T_SCOPE_ALL_EQ + description : >- + The number of cells in each ring buffer. + + - name : MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_CELL_SIZE + category : CH4 + type : int + default : 320 + class : none + verbosity : MPI_T_VERBOSITY_USER_BASIC + scope : MPI_T_SCOPE_ALL_EQ + description : >- + Size of each cell of ring buffer. + + - name : MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_ENABLE + category : CH4 + type : boolean + default : false + class : none + verbosity : MPI_T_VERBOSITY_USER_BASIC + scope : MPI_T_SCOPE_ALL_EQ + description : >- + Control if ring buffers are enabled. === END_MPI_T_CVAR_INFO_BLOCK === */ @@ -45,6 +74,7 @@ static int init_transport(void *slab, int vci_src, int vci_dst) transport->num_cells = MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_NUM_CELLS; transport->size_of_cell = MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_CELL_SIZE; + transport->qp = NULL; if (MPIR_CVAR_CH4_SHM_POSIX_TOPO_ENABLE) { int queue_types[2] = { @@ -75,6 +105,35 @@ static int init_transport(void *slab, int vci_src, int vci_dst) MPIDU_GENQ_SHMEM_QUEUE_TYPE__MPSC); MPIR_ERR_CHECK(mpi_errno); + if (MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_ENABLE) { + int buf_idx_base = MPIR_Process.local_rank; + int rb_size = MPIDI_POSIX_eager_iqueue_rb_size(MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_CELL_SIZE, + MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_NUM_CELLS); + char *qp_base = (char *) slab + MPIDI_POSIX_eager_iqueue_global.qp_offset; + + transport->qp = MPL_malloc(sizeof(MPIDI_POSIX_eager_iqueue_qp_t *) + * MPIR_Process.local_size, MPL_MEM_SHM); + MPIR_Assert(transport->qp); + + for (int peer_rank = 0; peer_rank < MPIR_Process.local_size; peer_rank++) { + if (peer_rank == MPIR_Process.local_rank) { + transport->qp[peer_rank] = NULL; + continue; + } + int send_idx = MPIR_Process.local_rank * MPIR_Process.local_size + peer_rank; + int recv_idx = peer_rank * MPIR_Process.local_size + MPIR_Process.local_rank; + char *send_slab = qp_base + send_idx * rb_size; + char *recv_slab = qp_base + recv_idx * rb_size; + MPIDI_POSIX_eager_iqueue_qp_t *qp = + MPIDI_POSIX_eager_iqueue_qp_init(send_slab, recv_slab, + MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_CELL_SIZE, + MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_NUM_CELLS, + peer_rank); + qp->cell_pool = transport->cell_pool; + transport->qp[peer_rank] = qp; + } + } + fn_exit: return mpi_errno; fn_fail: @@ -93,9 +152,21 @@ int MPIDI_POSIX_iqueue_shm_size(int local_size) MPIDU_genq_shmem_pool_size(cell_size, num_cells, local_size, num_free_queue); int terminal_size = local_size * sizeof(MPIDU_genq_shmem_queue_u); - int slab_size = pool_size + terminal_size; + int slab_size = MPL_ROUND_UP_ALIGN(pool_size + terminal_size, sysconf(_SC_PAGESIZE)); MPIDI_POSIX_eager_iqueue_global.terminal_offset = pool_size; + + if (MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_ENABLE) { + int qp_cell_size = MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_CELL_SIZE; + int qp_num_cells = MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_NUM_CELLS; + + int total_qp_size = MPIDI_POSIX_eager_iqueue_qp_size(qp_cell_size, qp_num_cells) + * local_size * local_size; + + MPIDI_POSIX_eager_iqueue_global.qp_offset = slab_size; + slab_size += total_qp_size; + } + MPIDI_POSIX_eager_iqueue_global.slab_size = slab_size; } @@ -184,6 +255,13 @@ int MPIDI_POSIX_iqueue_finalize(void) MPIR_ERR_CHECK(mpi_errno); MPIDI_POSIX_eager_iqueue_global.root_slab = NULL; + + if (MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_ENABLE) { + for (int i = 0; i < MPIR_Process.local_size; i++) { + MPIDI_POSIX_eager_iqueue_qp_free(&transport->qp[i]); + } + MPL_free(transport->qp); + } } if (MPIDI_POSIX_eager_iqueue_global.all_vci_slab) { @@ -198,6 +276,13 @@ int MPIDI_POSIX_iqueue_finalize(void) mpi_errno = MPIDU_genq_shmem_pool_destroy(transport->cell_pool); MPIR_ERR_CHECK(mpi_errno); + + if (MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_ENABLE) { + for (int i = 0; i < MPIR_Process.local_size; i++) { + MPIDI_POSIX_eager_iqueue_qp_free(&transport->qp[i]); + } + MPL_free(transport->qp); + } } } diff --git a/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_qp.c b/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_qp.c new file mode 100644 index 00000000000..209e4890d3e --- /dev/null +++ b/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_qp.c @@ -0,0 +1,65 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#include +#include "iqueue_noinline.h" + +MPIDI_POSIX_eager_iqueue_qp_t *MPIDI_POSIX_eager_iqueue_qp_init(void *send_slab, void *recv_slab, + int cell_size, int num_cells, + int peer_rank) +{ + MPIDI_POSIX_eager_iqueue_qp_t *qp = NULL; + + MPIR_Assert(send_slab); + MPIR_Assert(recv_slab); + MPIR_Assert(MPL_CHECK_ALIGN((uintptr_t) send_slab, sysconf(_SC_PAGESIZE))); + MPIR_Assert(MPL_CHECK_ALIGN((uintptr_t) recv_slab, sysconf(_SC_PAGESIZE))); + MPIR_Assert(MPL_CHECK_ALIGN(cell_size, MPL_CACHELINE_SIZE)); + MPIR_Assert(MPL_is_pof2(num_cells)); + MPIR_Assert(peer_rank != MPIR_Process.local_rank); + + qp = (MPIDI_POSIX_eager_iqueue_qp_t *) MPL_malloc(sizeof(MPIDI_POSIX_eager_iqueue_qp_t), + MPL_MEM_SHM); + if (qp == NULL) { + goto fn_fail; + } + + qp->cell_size = cell_size; + qp->num_cells = num_cells; + + qp->send.cntr = (MPIDI_POSIX_eager_iqueue_rb_cntr_t *) send_slab; + qp->send.base = send_slab + sizeof(MPIDI_POSIX_eager_iqueue_rb_cntr_t); + qp->send.next_seq = 0; + qp->send.last_ack = 0; + + qp->recv.cntr = (MPIDI_POSIX_eager_iqueue_rb_cntr_t *) recv_slab; + qp->recv.base = recv_slab + sizeof(MPIDI_POSIX_eager_iqueue_rb_cntr_t); + qp->recv.next_seq = 0; + qp->recv.last_ack = 0; + + qp->peer_rank = peer_rank; + + /* init counter will do first-touch NUMA affinity for the recv slab */ + memset(recv_slab, 0, MPIDI_POSIX_eager_iqueue_rb_size(cell_size, num_cells)); + + fn_exit: + return qp; + fn_fail: + if (qp) { + MPL_free(qp); + qp = NULL; + } + goto fn_exit; +} + +void MPIDI_POSIX_eager_iqueue_qp_free(MPIDI_POSIX_eager_iqueue_qp_t ** qp) +{ + if (*qp == NULL) { + return; + } + + MPL_free(*qp); + qp = NULL; +} diff --git a/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_qp.h b/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_qp.h new file mode 100644 index 00000000000..9fa9f2f8b8b --- /dev/null +++ b/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_qp.h @@ -0,0 +1,93 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#ifndef POSIX_EAGER_IQUEUE_QP_H_INCLUDED +#define POSIX_EAGER_IQUEUE_QP_H_INCLUDED + +#include + +MPIDI_POSIX_eager_iqueue_qp_t *MPIDI_POSIX_eager_iqueue_qp_init(void *send_slab, void *recv_slab, + int cell_size, int num_cells, + int peer_rank); +void MPIDI_POSIX_eager_iqueue_qp_free(MPIDI_POSIX_eager_iqueue_qp_t ** qp); + +MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_eager_iqueue_rb_size(int cell_size, int num_cells) +{ + return MPL_ROUND_UP_ALIGN((sizeof(MPIDI_POSIX_eager_iqueue_rb_cntr_t) + num_cells * cell_size), + sysconf(_SC_PAGESIZE)); +} + +MPL_STATIC_INLINE_PREFIX int MPIDI_POSIX_eager_iqueue_qp_size(int cell_size, int num_cells) +{ + return 2 * MPIDI_POSIX_eager_iqueue_rb_size(cell_size, num_cells); +} + +MPL_STATIC_INLINE_PREFIX + int MPIDI_POSIX_eager_iqueue_qp_cntr_to_idx(MPIDI_POSIX_eager_iqueue_qp_t * qp, uint64_t cntr) +{ + return cntr & (qp->num_cells - 1); +} + +MPL_STATIC_INLINE_PREFIX void +*MPIDI_POSIX_eager_iqueue_qp_get_send_cell(MPIDI_POSIX_eager_iqueue_qp_t * qp) +{ + char *cell = NULL; + if (qp->send.next_seq - qp->send.last_ack == qp->num_cells) { + uint64_t new_ack = MPL_atomic_acquire_load_uint64(&qp->send.cntr->ack); + if (new_ack == qp->send.last_ack) { + return NULL; + } else { + for (int i = qp->send.last_ack; i < new_ack; i++) { + MPIDI_POSIX_eager_iqueue_cell_ext_t *tmp = (MPIDI_POSIX_eager_iqueue_cell_ext_t *) + (qp->send.base + + MPIDI_POSIX_eager_iqueue_qp_cntr_to_idx(qp, i) * qp->cell_size); + if (tmp->base.type & MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_BUF) { + cell = MPIDU_genq_shmem_pool_handle_to_cell(qp->cell_pool, tmp->buf_handle); + MPIDU_genq_shmem_pool_cell_free(qp->cell_pool, cell); + } + } + } + qp->send.last_ack = new_ack; + } + + cell = qp->send.base + MPIDI_POSIX_eager_iqueue_qp_cntr_to_idx(qp, qp->send.next_seq) + * qp->cell_size; + + return cell; +} + +MPL_STATIC_INLINE_PREFIX void +*MPIDI_POSIX_eager_iqueue_qp_get_recv_cell(MPIDI_POSIX_eager_iqueue_qp_t * qp) +{ + char *cell = NULL; + if (qp->recv.last_ack == qp->recv.next_seq) { + uint64_t new_seq = MPL_atomic_acquire_load_uint64(&qp->recv.cntr->seq); + if (new_seq == qp->recv.next_seq) { + return NULL; + } + qp->recv.next_seq = new_seq; + } + + cell = qp->recv.base + MPIDI_POSIX_eager_iqueue_qp_cntr_to_idx(qp, qp->recv.last_ack) + * qp->cell_size; + + return cell; +} + +MPL_STATIC_INLINE_PREFIX void +MPIDI_POSIX_eager_iqueue_qp_send_commit(MPIDI_POSIX_eager_iqueue_qp_t * qp) +{ + qp->send.next_seq++; + MPL_atomic_release_store_uint64(&qp->send.cntr->seq, qp->send.next_seq); +} + +MPL_STATIC_INLINE_PREFIX void +MPIDI_POSIX_eager_iqueue_qp_recv_complete(MPIDI_POSIX_eager_iqueue_qp_t * qp) +{ + qp->recv.last_ack++; + MPL_atomic_release_store_uint64(&qp->recv.cntr->ack, qp->recv.last_ack); +} + +#endif /* POSIX_EAGER_IQUEUE_QP_H_INCLUDED */ diff --git a/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_recv.h b/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_recv.h index 39f999a87bb..0e5fa33ba6c 100644 --- a/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_recv.h +++ b/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_recv.h @@ -23,19 +23,45 @@ MPIDI_POSIX_eager_recv_begin(int vci, MPIDI_POSIX_eager_recv_transaction_t * tra for (int vci_src = 0; vci_src < max_vcis; vci_src++) { transport = MPIDI_POSIX_eager_iqueue_get_transport(vci_src, vci); - MPIDU_genq_shmem_queue_dequeue(transport->cell_pool, transport->my_terminal, - (void **) &cell); + if (MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_ENABLE) { + MPIDI_POSIX_eager_iqueue_qp_t *qp = NULL; + for (int i = 0; i < MPIR_Process.local_size; i++) { + if (i == MPIR_Process.local_rank) + continue; + + qp = transport->qp[i]; + cell = (MPIDI_POSIX_eager_iqueue_cell_t *) + MPIDI_POSIX_eager_iqueue_qp_get_recv_cell(qp); + if (cell == NULL) { + continue; + } else { + break; + } + } + } else { + MPIDU_genq_shmem_queue_dequeue(transport->cell_pool, transport->my_terminal, + (void **) &cell); + } + if (cell) { transaction->src_local_rank = cell->from; transaction->src_vci = vci_src; transaction->dst_vci = vci; - transaction->payload = MPIDI_POSIX_EAGER_IQUEUE_CELL_PAYLOAD(cell); transaction->payload_sz = cell->payload_size; - if (likely(cell->type == MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_HDR)) { + if (cell->type & MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_BUF) { + uint64_t handle = ((MPIDI_POSIX_eager_iqueue_cell_ext_t *) cell)->buf_handle; + /* payload should be buffer mapped from the handle */ + transaction->payload = MPIDU_genq_shmem_pool_handle_to_cell(transport->cell_pool, + handle); + } else { + transaction->payload = MPIDI_POSIX_EAGER_IQUEUE_CELL_PAYLOAD(cell); + } + + if (likely(cell->type & MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_HDR)) { transaction->msg_hdr = &cell->am_header; } else { - MPIR_Assert(cell->type == MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_DATA); + MPIR_Assert(cell->type & MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_DATA); transaction->msg_hdr = NULL; } @@ -66,9 +92,14 @@ MPIDI_POSIX_eager_recv_commit(MPIDI_POSIX_eager_recv_transaction_t * transaction MPIR_FUNC_ENTER; transport = MPIDI_POSIX_eager_iqueue_get_transport(transaction->src_vci, transaction->dst_vci); - cell = (MPIDI_POSIX_eager_iqueue_cell_t *) transaction->transport.iqueue.pointer_to_cell; - MPIDU_genq_shmem_pool_cell_free(transport->cell_pool, cell); + if (MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_ENABLE) { + MPIDI_POSIX_eager_iqueue_qp_t *qp = transport->qp[transaction->src_local_rank]; + MPIDI_POSIX_eager_iqueue_qp_recv_complete(qp); + } else { + cell = (MPIDI_POSIX_eager_iqueue_cell_t *) transaction->transport.iqueue.pointer_to_cell; + MPIDU_genq_shmem_pool_cell_free(transport->cell_pool, cell); + } MPIR_FUNC_EXIT; } diff --git a/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_send.h b/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_send.h index 4104637652d..db8f312ee61 100644 --- a/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_send.h +++ b/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_send.h @@ -11,8 +11,6 @@ MPL_STATIC_INLINE_PREFIX size_t MPIDI_POSIX_eager_payload_limit(void) { - /* reduce the eager payload limit by MAX_ALIGNMENT to account for alignment in - * MPIDI_POSIX_eager_send below */ return MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_CELL_SIZE - sizeof(MPIDI_POSIX_eager_iqueue_cell_t) - MAX_ALIGNMENT; } @@ -22,6 +20,125 @@ MPL_STATIC_INLINE_PREFIX size_t MPIDI_POSIX_eager_buf_limit(void) return MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_CELL_SIZE; } +MPL_STATIC_INLINE_PREFIX int +MPIDI_POSIX_eager_send_qp(int grank, MPIDI_POSIX_am_header_t * msg_hdr, const void *am_hdr, + MPI_Aint am_hdr_sz, const void *buf, MPI_Aint count, + MPI_Datatype datatype, MPI_Aint offset, int src_vci, int dst_vci, + MPI_Aint * bytes_sent) +{ + MPIDI_POSIX_eager_iqueue_transport_t *transport; + MPIDI_POSIX_eager_iqueue_cell_t *cell, *qcell; + MPIDU_genq_shmem_queue_t terminal; + size_t capacity, available; + char *payload; + int ret = MPIDI_POSIX_OK; + MPI_Aint packed_size = 0; + bool need_iov_buf = false; + + MPIR_FUNC_ENTER; + + MPI_Aint data_sz; + MPIDI_Datatype_check_size(datatype, count, data_sz); + + /* Get the transport object that holds all of the global variables. */ + transport = MPIDI_POSIX_eager_iqueue_get_transport(src_vci, dst_vci); + + int dst_local_rank = MPIDI_SHM_global.local_ranks[grank]; + bool is_topo_local = + (MPIDI_POSIX_global.local_rank_dist[dst_local_rank] == MPIDI_POSIX_DIST__LOCAL); + + MPIDI_POSIX_eager_iqueue_qp_t *qp = transport->qp[dst_local_rank]; + + need_iov_buf = (data_sz - offset + MPL_ROUND_UP_ALIGN(am_hdr_sz, MAX_ALIGNMENT)) + > (qp->cell_size - sizeof(MPIDI_POSIX_eager_iqueue_cell_t)); + /* Try to get a new cell to hold the message. If a cell wasn't available, + * let the caller know that we weren't able to send the message immediately. + */ + qcell = (MPIDI_POSIX_eager_iqueue_cell_t *) MPIDI_POSIX_eager_iqueue_qp_get_send_cell(qp); + if (qcell == NULL) { + ret = MPIDI_POSIX_NOK; + goto fn_exit; + } + + if (need_iov_buf) { + /* get handle of a shm buffer */ + if (is_topo_local) { + MPIDU_genq_shmem_pool_cell_alloc(transport->cell_pool, (void **) &cell, + MPIR_Process.local_rank, 0 /* intra NUMA */ , buf); + } else { + MPIDU_genq_shmem_pool_cell_alloc(transport->cell_pool, (void **) &cell, dst_local_rank, + 1 /* inter NUMA */ , buf); + } + if (cell == NULL) { + ret = MPIDI_POSIX_NOK; + goto fn_exit; + } + uint64_t handle = MPIDU_genq_shmem_pool_cell_to_handle(cell); + + qcell->type = MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_BUF; + ((MPIDI_POSIX_eager_iqueue_cell_ext_t *) qcell)->buf_handle = handle; + + /* map handle to cell */ + capacity = transport->size_of_cell; + payload = (char *) cell; + } else { + qcell->type = 0; + + cell = qcell; + capacity = qp->cell_size - sizeof(MPIDI_POSIX_eager_iqueue_cell_t); + payload = MPIDI_POSIX_EAGER_IQUEUE_CELL_PAYLOAD(cell); + } + + available = capacity; + + qcell->from = MPIR_Process.local_rank; + + /* If this is the beginning of the message, mark it as the head. Otherwise it will be the + * tail. */ + qcell->payload_size = 0; + if (am_hdr) { + MPI_Aint resized_am_hdr_sz = MPL_ROUND_UP_ALIGN(am_hdr_sz, MAX_ALIGNMENT); + qcell->am_header = *msg_hdr; + qcell->type |= MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_HDR; + /* send am_hdr if this is the first segment */ + if (is_topo_local) { + MPIR_Typerep_copy(payload, am_hdr, am_hdr_sz, MPIR_TYPEREP_FLAG_NONE); + } else { + MPIR_Typerep_copy(payload, am_hdr, am_hdr_sz, MPIR_TYPEREP_FLAG_STREAM); + } + /* make sure the data region starts at the boundary of MAX_ALIGNMENT */ + payload = payload + resized_am_hdr_sz; + qcell->payload_size += resized_am_hdr_sz; + qcell->am_header.am_hdr_sz = resized_am_hdr_sz; + available -= qcell->am_header.am_hdr_sz; + } else { + qcell->type |= MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_DATA; + } + + /* We want to skip packing of send buffer if there is no data to be sent . buf == NULL is + * not a correct check here because derived datatype can use absolute address for displacement + * which requires buffer address passed as MPI_BOTTOM which is usually NULL. count == 0 is also + * not reliable because the derived datatype could have zero block size which contains no + * data. */ + if (bytes_sent) { + if (is_topo_local) { + MPIR_Typerep_pack(buf, count, datatype, offset, payload, available, &packed_size, + MPIR_TYPEREP_FLAG_NONE); + } else { + MPIR_Typerep_pack(buf, count, datatype, offset, payload, available, &packed_size, + MPIR_TYPEREP_FLAG_STREAM); + } + qcell->payload_size += packed_size; + *bytes_sent = packed_size; + } + + MPIDI_POSIX_eager_iqueue_qp_send_commit(qp); + + fn_exit: + MPIR_FUNC_EXIT; + return ret; +} + /* This function attempts to send the next chunk of a message via the queue. If no cells are * available, this function will return and the caller is expected to queue the message for later * and retry. @@ -52,6 +169,11 @@ MPIDI_POSIX_eager_send(int grank, MPIDI_POSIX_am_header_t * msg_hdr, const void MPIR_FUNC_ENTER; + if (MPIR_CVAR_CH4_SHM_POSIX_IQUEUE_QP_ENABLE) { + return MPIDI_POSIX_eager_send_qp(grank, msg_hdr, am_hdr, am_hdr_sz, buf, count, datatype, + offset, src_vci, dst_vci, bytes_sent); + } + /* Get the transport object that holds all of the global variables. */ transport = MPIDI_POSIX_eager_iqueue_get_transport(src_vci, dst_vci); diff --git a/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_types.h b/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_types.h index f77799a25e8..e28da0b9dac 100644 --- a/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_types.h +++ b/src/mpid/ch4/shm/posix/eager/iqueue/iqueue_types.h @@ -10,19 +10,59 @@ #include "mpidu_init_shm.h" #include "mpidu_genq.h" -#define MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_HDR 0 -#define MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_DATA 1 +#define MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_HDR 0x1 +#define MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_DATA 0x2 +#define MPIDI_POSIX_EAGER_IQUEUE_CELL_TYPE_BUF 0x4 -typedef struct MPIDI_POSIX_eager_iqueue_cell MPIDI_POSIX_eager_iqueue_cell_t; - -/* Each cell contains some data being communicated from one process to another. */ -struct MPIDI_POSIX_eager_iqueue_cell { +/* Each cell contains some data being communicated from one process to another. + * The struct will be packed by default, occuping 16 bytes. Used in regular + * queue and fast box */ +typedef struct MPIDI_POSIX_eager_iqueue_cell { uint16_t type; /* Type of cell (head/tail/etc.) */ uint16_t from; /* Who is the message in the cell from */ uint32_t payload_size; /* Size of the message in the cell */ MPIDI_POSIX_am_header_t am_header; /* If this cell is the beginning of a message, it will have * an active message header and this will point to it. */ -}; +} MPIDI_POSIX_eager_iqueue_cell_t; + +typedef struct MPIDI_POSIX_eager_iqueue_cell_ext { + MPIDI_POSIX_eager_iqueue_cell_t base; + uint64_t buf_handle; +} MPIDI_POSIX_eager_iqueue_cell_ext_t; + +/* Note we deliberately not handling or checking the counter wrap around case + * because 64bit counters would take way too long to overflow. Generally, two 2 + * GHz CPU cores can sustain abount 200 million enqueue/dequeue per second with + * ring buffer. That's >90 billion seconds (>2900 years) just updating atomic + * counters without actually send any data. In reality, read/write data or having + * multiple processes communicating will strech the time to overflow even further. + */ +typedef struct { + /* *INDENT-OFF* */ + _Alignas(MPL_CACHELINE_SIZE) MPL_atomic_uint64_t seq; + _Alignas(MPL_CACHELINE_SIZE) MPL_atomic_uint64_t ack; + /* *INDENT-ON* */ +} MPIDI_POSIX_eager_iqueue_rb_cntr_t; + +typedef struct { + int cell_size; + int num_cells; + struct { + MPIDI_POSIX_eager_iqueue_rb_cntr_t *cntr; + char *base; + uint64_t next_seq; + uint64_t last_ack; + } send; + struct { + MPIDI_POSIX_eager_iqueue_rb_cntr_t *cntr; + char *base; + uint64_t next_seq; + uint64_t last_ack; + } recv; + /* nice to have stuff, not really needed */ + int peer_rank; + MPIDU_genq_shmem_pool_t cell_pool; +} MPIDI_POSIX_eager_iqueue_qp_t; typedef struct MPIDI_POSIX_eager_iqueue_transport { int num_cells; /* The number of cells allocated to each terminal in this transport */ @@ -31,6 +71,7 @@ typedef struct MPIDI_POSIX_eager_iqueue_transport { * describe each of the cells */ MPIDU_genq_shmem_queue_t my_terminal; MPIDU_genq_shmem_pool_t cell_pool; + MPIDI_POSIX_eager_iqueue_qp_t **qp; } MPIDI_POSIX_eager_iqueue_transport_t; typedef struct MPIDI_POSIX_eager_iqueue_global { @@ -38,6 +79,7 @@ typedef struct MPIDI_POSIX_eager_iqueue_global { /* sizes for shmem slabs */ int slab_size; int terminal_offset; + int qp_offset; /* shmem slabs */ void *root_slab; void *all_vci_slab; diff --git a/src/mpid/ch4/src/mpidig_pt2pt_callbacks.c b/src/mpid/ch4/src/mpidig_pt2pt_callbacks.c index 986c797e917..f77af8a6a50 100644 --- a/src/mpid/ch4/src/mpidig_pt2pt_callbacks.c +++ b/src/mpid/ch4/src/mpidig_pt2pt_callbacks.c @@ -381,6 +381,8 @@ int MPIDIG_send_target_msg_cb(void *am_hdr, void *data, MPI_Aint in_data_sz, msg_mode = MSG_MODE_EAGER; } + /* printf("send target msg cb, hdr: src_rank %d, tag %d, context_id %d, data_sz %ld\n", */ + /* hdr->src_rank, hdr->tag, hdr->context_id, hdr->data_sz); */ mpi_errno = match_posted_rreq(hdr->src_rank, hdr->tag, hdr->context_id, local_vci, is_local, &rreq); MPIR_ERR_CHECK(mpi_errno); diff --git a/src/mpid/common/genq/mpidu_genq_shmem_pool.h b/src/mpid/common/genq/mpidu_genq_shmem_pool.h index e3de456ba1c..d02cd3508d3 100644 --- a/src/mpid/common/genq/mpidu_genq_shmem_pool.h +++ b/src/mpid/common/genq/mpidu_genq_shmem_pool.h @@ -65,4 +65,18 @@ static inline int MPIDU_genq_shmem_pool_cell_free(MPIDU_genq_shmem_pool_t pool, return rc; } +static inline uint64_t MPIDU_genq_shmem_pool_cell_to_handle(void *cell) +{ + MPIDU_genqi_shmem_cell_header_s *cell_h = CELL_TO_HEADER(cell); + return (uint64_t) cell_h->handle; +} + +static inline char *MPIDU_genq_shmem_pool_handle_to_cell(MPIDU_genq_shmem_pool_t pool, + uint64_t handle) +{ + MPIDU_genqi_shmem_pool_s *pool_obj = (MPIDU_genqi_shmem_pool_s *) pool; + MPIDU_genqi_shmem_cell_header_s *cell_h = HANDLE_TO_HEADER(pool_obj, handle); + return HEADER_TO_CELL(cell_h); +} + #endif /* ifndef MPIDU_GENQ_SHMEM_POOL_H_INCLUDED */ diff --git a/src/mpl/include/mpl_base.h b/src/mpl/include/mpl_base.h index ec3184a694f..4222f9e3c19 100644 --- a/src/mpl/include/mpl_base.h +++ b/src/mpl/include/mpl_base.h @@ -154,5 +154,6 @@ typedef bool _Bool; #define MPL_ROUND_UP_ALIGN(a, alignment) (((a) + ((alignment) - 1)) & (~((alignment) - 1))) #define MPL_ROUND_DOWN_ALIGN(a, alignment) ((a) & (~((alignment) - 1))) +#define MPL_CHECK_ALIGN(a, alignment) ((a) % (alignment) == 0) #endif /* MPL_BASE_H_INCLUDED */