diff --git a/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c b/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c index 9cdc6d1b4d3..044657e16b2 100644 --- a/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c +++ b/src/mpi/coll/algorithms/circ_graph/cga_request_queue.c @@ -22,7 +22,7 @@ * Otherwise, the algorithm is inefficient or inconsistent. */ -static int calc_chunks(MPI_Aint data_size, MPI_Aint chunk_size, int *last_msg_size_out); +static MPI_Aint calc_chunks(MPI_Aint data_size, MPI_Aint chunk_size, MPI_Aint * last_msg_size_out); static int get_pending_id(MPII_cga_request_queue * queue, int block, int root); static int get_pending_req_id(MPII_cga_request_queue * queue, int block, int root); @@ -34,8 +34,13 @@ static void remove_pending_req_id(MPII_cga_request_queue * queue, int block, int * - for reduce, packbuf is stored in the requests[].packbuf, and freed with the request */ static void *get_persist_packbuf(MPII_cga_request_queue * queue, int block, int root); +static bool is_persist_packbuf_loaded(MPII_cga_request_queue * queue, int block, int root); +static bool is_in_staging(MPII_cga_request_queue * queue, int block, int root); static void add_persist_packbuf(MPII_cga_request_queue * queue, int block, int root, void *packbuf); -static int alloc_packbuf(void **packbuf_out); +static void set_persist_packbuf_loaded(MPII_cga_request_queue * queue, int block, int root); +static void set_in_staging(MPII_cga_request_queue * queue, int block, int root); + +static int alloc_packbuf(MPII_cga_request_queue * queue, void **packbuf_out); /* free buffers depend on where they are stored */ static void clear_request(MPII_cga_request_queue * queue, int req_id); static void clear_pending(MPII_cga_request_queue * queue, int pending_id); @@ -48,6 +53,8 @@ static int issue_pack(MPII_cga_request_queue * queue, int block, int root, void *packbuf, MPIR_gpu_req * areq); static int issue_unpack(MPII_cga_request_queue * queue, int block, int root, void *packbuf, void *tmpbuf, MPIR_gpu_req * areq); +static int issue_staging(MPII_cga_request_queue * queue, int block, int root, + bool is_load, MPIR_gpu_req * areq); static int issue_isend_contig(MPII_cga_request_queue * queue, int block, int root, int peer_rank, MPIR_Request ** req); static int issue_isend_packed(MPII_cga_request_queue * queue, int block, int root, @@ -84,8 +91,9 @@ static void debug_queue(MPII_cga_request_queue * queue); /* Routines for managing non-blocking send/recv of chunks */ static int init_request_queue_common(MPII_cga_request_queue * queue, - int q_len, int num_pending, int num_chunks, int all_size, - int chunk_size, int last_chunk_size, bool inverse_order, + int q_len, int num_pending, MPI_Aint num_chunks, int all_size, + MPI_Aint chunk_size, MPI_Aint last_chunk_size, + bool inverse_order, void *buf, MPI_Aint count, MPI_Datatype datatype, MPIR_Comm * comm, int coll_attr) { @@ -97,6 +105,7 @@ static int init_request_queue_common(MPII_cga_request_queue * queue, MPIR_GPU_query_pointer_attr(buf, &queue->attr); queue->need_pack = (!dt_contig || MPL_gpu_attr_is_dev(&queue->attr)); + queue->need_staging = false; queue->comm = comm; queue->coll_attr = coll_attr; queue->num_chunks = num_chunks; @@ -107,6 +116,7 @@ static int init_request_queue_common(MPII_cga_request_queue * queue, queue->count = count; queue->datatype = datatype; queue->inverse_order = inverse_order; + queue->dt_contig = dt_contig; mpi_errno = MPIR_Sched_next_tag(comm, &queue->tag); MPIR_ERR_CHECK(mpi_errno); @@ -179,8 +189,8 @@ int MPII_cga_init_bcast_queue(MPII_cga_request_queue * queue, int num_pending, MPIR_Datatype_get_size_macro(datatype, type_size); data_size = count * type_size; - int last_chunk_size; - int num_chunks = calc_chunks(data_size, chunk_size, &last_chunk_size); + MPI_Aint last_chunk_size; + MPI_Aint num_chunks = calc_chunks(data_size, chunk_size, &last_chunk_size); bool inverse_order = false; mpi_errno = init_request_queue_common(queue, q_len, num_pending, num_chunks, 1, @@ -210,8 +220,8 @@ int MPII_cga_init_allgather_queue(MPII_cga_request_queue * queue, int num_pendin MPI_Aint type_size; MPIR_Datatype_get_size_macro(datatype, type_size); - int last_chunk_size; - int num_chunks = calc_chunks(count * type_size, chunk_size, &last_chunk_size); + MPI_Aint last_chunk_size; + MPI_Aint num_chunks = calc_chunks(count * type_size, chunk_size, &last_chunk_size); int all_size = comm_size; bool inverse_order = false; @@ -234,7 +244,7 @@ int MPII_cga_init_reduce_queue(MPII_cga_request_queue * queue, int num_pending, { int mpi_errno = MPI_SUCCESS; - int chunk_size = MPIR_CVAR_CIRC_GRAPH_CHUNK_SIZE; + MPI_Aint chunk_size = MPIR_CVAR_CIRC_GRAPH_CHUNK_SIZE; int q_len = MPIR_CVAR_CIRC_GRAPH_Q_LEN; /* minimum q_len is 2 */ @@ -244,6 +254,7 @@ int MPII_cga_init_reduce_queue(MPII_cga_request_queue * queue, int num_pending, MPI_Aint type_size; MPIR_Datatype_get_size_macro(datatype, type_size); + MPIR_Assert(type_size > 0); MPI_Aint num_chunks, chunk_count, last_chunk_count, last_chunk_size; if (chunk_size == 0) { @@ -253,9 +264,6 @@ int MPII_cga_init_reduce_queue(MPII_cga_request_queue * queue, int num_pending, } else { /* reduction chunks have to contain whole datatypes */ chunk_count = chunk_size / type_size; - if (chunk_size > 0 && chunk_size % type_size > 0) { - chunk_count++; - } num_chunks = count / chunk_count; last_chunk_count = count % chunk_count; @@ -283,11 +291,12 @@ int MPII_cga_init_reduce_queue(MPII_cga_request_queue * queue, int num_pending, queue->u.reduce.type_size = type_size; queue->u.reduce.type_extent = extent; queue->u.reduce.true_lb = true_lb; + queue->u.reduce.true_extent = true_extent; queue->u.reduce.chunk_extent = chunk_count * extent; if (!queue->need_pack) { /* datatype must be contig, no pack_buf, need tmpbuf to receive chunk data */ queue->u.reduce.tmpbuf_size = chunk_size; - } else if (type_size == true_extent) { + } else if (type_size == true_extent && type_size == extent) { /* datatype is contig, skip tmpbuf, we can do reduce from pack_buf */ queue->u.reduce.tmpbuf_size = 0; } else { @@ -299,6 +308,38 @@ int MPII_cga_init_reduce_queue(MPII_cga_request_queue * queue, int num_pending, return mpi_errno; } +int MPII_cga_init_allreduce_queue(MPII_cga_request_queue * queue, int num_pending, + void *recvbuf, MPI_Aint count, MPI_Datatype datatype, + MPI_Op op, MPIR_Comm * comm, int coll_attr) +{ + int mpi_errno = MPII_cga_init_reduce_queue(queue, num_pending, recvbuf, count, datatype, + op, comm, coll_attr); + if (queue->need_pack && MPL_gpu_attr_is_dev(&queue->attr)) { + /* alloc queue->hostbuf */ + MPI_Aint buf_size = (count - 1) * queue->u.reduce.type_extent + queue->u.reduce.true_extent; + queue->u.reduce.staging_buf = MPL_malloc(buf_size, MPL_MEM_COLL);; + if (!queue->u.reduce.staging_buf) { + MPIR_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**nomem"); + } + queue->need_staging = true; + } + return mpi_errno; +} + +int MPII_cga_switch_coll_type(MPII_cga_request_queue * queue, enum MPII_cga_type coll_type) +{ + /* for now, we only support switching from REDUCE to BCAST (as an ALLREDUCE composition) */ + MPIR_Assert(queue->coll_type == MPII_CGA_REDUCE && coll_type == MPII_CGA_BCAST); + queue->coll_type = MPII_CGA_BCAST; + queue->inverse_order = false; + /* at this point, pending_blocks must be tracking 0, 1, ..., num_pending-1 */ + MPIR_Assert(queue->pending_head_block == -1); + queue->pending_head_block = queue->num_pending; + queue->pending_head = 0; + + return MPI_SUCCESS; +} + #define GET_BLOCK_SIZE(block) (((block) == queue->num_chunks - 1) ? queue->last_chunk_size : queue->chunk_size) /* ---- bcast ---- */ @@ -512,7 +553,13 @@ static int reduce_local(MPII_cga_request_queue * queue, int block, void *buf) int mpi_errno = MPI_SUCCESS; void *buf_in = (char *) buf - queue->u.reduce.true_lb; - void *buf_inout = (char *) queue->buf + block * queue->u.reduce.chunk_extent; + void *buf_inout; + if (queue->need_staging) { + buf_inout = (char *) queue->u.reduce.staging_buf - queue->u.reduce.true_lb + + block * queue->u.reduce.chunk_extent; + } else { + buf_inout = (char *) queue->buf + block * queue->u.reduce.chunk_extent; + } MPI_Aint count = GET_BLOCK_SIZE(block) / queue->u.reduce.type_size; mpi_errno = MPIR_Reduce_local(buf_in, buf_inout, count, queue->datatype, queue->u.reduce.op); @@ -575,6 +622,9 @@ int MPII_cga_testall(MPII_cga_request_queue * queue, bool * is_done) /* free the memory */ MPL_free(queue->pending_blocks); MPL_free(queue->requests); + if (queue->need_staging) { + MPL_free(queue->u.reduce.staging_buf); + } *is_done = true; @@ -615,14 +665,16 @@ static int issue_nb_op(MPII_cga_request_queue * queue, enum MPII_cga_op_type op_ int req_id = queue->q_head; #define REQi queue->requests[req_id] REQi.op_type = op_type; + REQi.coll_type = queue->coll_type; REQi.peer_rank = peer_rank; REQi.block = block; REQi.root = root; REQi.op_stage = MPII_CGA_STAGE_START; REQi.issued = false; + REQi.staging_areq.type = MPIR_NULL_REQUEST; REQi.tmpbuf = NULL; - if (op_type == MPII_CGA_OP_RECV && queue->coll_type == MPII_CGA_REDUCE) { + if (op_type == MPII_CGA_OP_RECV && REQi.coll_type == MPII_CGA_REDUCE) { /* reduce need recv (or unpack) into a tmp buffer before reduce_local */ if (queue->u.reduce.tmpbuf_size > 0) { REQi.tmpbuf = MPL_malloc(queue->u.reduce.tmpbuf_size, MPL_MEM_OTHER); @@ -679,8 +731,11 @@ static int issue_pack(MPII_cga_request_queue * queue, int block, int root, MPI_Aint nbytes = GET_BLOCK_SIZE(block); void *src_buf = queue->buf; + if (queue->need_staging && is_in_staging(queue, block, root)) { + src_buf = (char *) queue->u.reduce.staging_buf - queue->u.reduce.true_lb; + } if (root > 0) { - src_buf = (char *) queue->buf + root * queue->buf_extent; + src_buf = (char *) src_buf + root * queue->buf_extent; } int engine_type = MPL_GPU_ENGINE_TYPE_COPY_HIGH_BANDWIDTH; /* TODO: add a cvar */ @@ -692,6 +747,38 @@ static int issue_pack(MPII_cga_request_queue * queue, int block, int root, return mpi_errno; } +static int issue_staging(MPII_cga_request_queue * queue, int block, int root, + bool is_load, MPIR_gpu_req * areq) +{ + int mpi_errno = MPI_SUCCESS; + + MPI_Aint nbytes = GET_BLOCK_SIZE(block); + MPI_Aint chunk_count = nbytes / queue->u.reduce.type_size; + + char *src_buf, *dst_buf; + if (is_load) { + src_buf = (char *) queue->buf; + dst_buf = (char *) queue->u.reduce.staging_buf - queue->u.reduce.true_lb; + } else { + src_buf = (char *) queue->u.reduce.staging_buf - queue->u.reduce.true_lb; + dst_buf = (char *) queue->buf; + } + if (root > 0) { + src_buf += root * queue->buf_extent; + dst_buf += root * queue->buf_extent; + } + MPI_Aint offset = block * queue->chunk_size; + src_buf += offset; + dst_buf += offset; + + int engine_type = MPL_GPU_ENGINE_TYPE_COPY_HIGH_BANDWIDTH; /* TODO: add a cvar */ + mpi_errno = MPIR_Ilocalcopy_gpu(src_buf, chunk_count, queue->datatype, 0, &queue->attr, + dst_buf, chunk_count, queue->datatype, 0, NULL, engine_type, 1, + areq); + + return mpi_errno; +} + static int issue_isend_contig(MPII_cga_request_queue * queue, int block, int root, int peer_rank, MPIR_Request ** req) { @@ -699,11 +786,14 @@ static int issue_isend_contig(MPII_cga_request_queue * queue, int block, int roo MPI_Aint nbytes = GET_BLOCK_SIZE(block); MPI_Aint offset = block * queue->chunk_size; - const void *send_buf; + char *send_buf = (char *) queue->buf;; + if (is_in_staging(queue, block, root)) { + send_buf = (char *) queue->u.reduce.staging_buf - queue->u.reduce.true_lb; + } if (root == 0) { - send_buf = (char *) queue->buf + offset; + send_buf += offset; } else { - send_buf = (char *) queue->buf + root * queue->buf_extent + offset; + send_buf += root * queue->buf_extent + offset; } mpi_errno = MPIC_Isend(send_buf, nbytes, MPIR_BYTE_INTERNAL, peer_rank, queue->tag, queue->comm, req, queue->coll_attr); @@ -850,6 +940,24 @@ static void *get_persist_packbuf(MPII_cga_request_queue * queue, int block, int return queue->pending_blocks[pending_id].persist_packbuf; } +static bool is_persist_packbuf_loaded(MPII_cga_request_queue * queue, int block, int root) +{ + int pending_id = get_pending_id(queue, block, root); + MPIR_Assert(pending_id >= 0); + MPIR_Assert(queue->pending_blocks[pending_id].persist_packbuf); + return queue->pending_blocks[pending_id].persist_packbuf_loaded; +} + +static bool is_in_staging(MPII_cga_request_queue * queue, int block, int root) +{ + if (!queue->need_staging) { + return false; + } + int pending_id = get_pending_id(queue, block, root); + MPIR_Assert(pending_id >= 0); + return queue->pending_blocks[pending_id].in_staging; +} + static void add_pending_req_id(MPII_cga_request_queue * queue, int block, int root, int req_id) { int pending_id = get_pending_id(queue, block, root); @@ -875,21 +983,48 @@ static void add_persist_packbuf(MPII_cga_request_queue * queue, int block, int r int pending_id = get_pending_id(queue, block, root); MPIR_Assert(pending_id >= 0); queue->pending_blocks[pending_id].persist_packbuf = packbuf; + queue->pending_blocks[pending_id].persist_packbuf_loaded = false; } -static int alloc_packbuf(void **packbuf_out) +static void set_persist_packbuf_loaded(MPII_cga_request_queue * queue, int block, int root) { - int mpi_errno; - mpi_errno = MPIDU_genq_private_pool_force_alloc_cell(MPIR_cga_chunk_pool, packbuf_out); + int pending_id = get_pending_id(queue, block, root); + MPIR_Assert(pending_id >= 0); + MPIR_Assert(queue->pending_blocks[pending_id].persist_packbuf); + queue->pending_blocks[pending_id].persist_packbuf_loaded = true; +} + +static void set_in_staging(MPII_cga_request_queue * queue, int block, int root) +{ + int pending_id = get_pending_id(queue, block, root); + MPIR_Assert(pending_id >= 0); + queue->pending_blocks[pending_id].in_staging = true; +} + +static int alloc_packbuf(MPII_cga_request_queue * queue, void **packbuf_out) +{ + int mpi_errno = MPI_SUCCESS; + if (MPIR_cga_chunk_pool) { + mpi_errno = MPIDU_genq_private_pool_force_alloc_cell(MPIR_cga_chunk_pool, packbuf_out); + } else { + *packbuf_out = MPL_malloc(queue->chunk_size, MPL_MEM_COLL); + if (!(*packbuf_out)) { + MPIR_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**nomem"); + } + } return mpi_errno; } static void clear_request(MPII_cga_request_queue * queue, int req_id) { #define REQi queue->requests[req_id] - if (queue->coll_type == MPII_CGA_REDUCE) { + if (REQi.coll_type == MPII_CGA_REDUCE) { if (REQi.packbuf) { - MPIDU_genq_private_pool_free_cell(MPIR_cga_chunk_pool, REQi.packbuf); + if (MPIR_cga_chunk_pool) { + MPIDU_genq_private_pool_free_cell(MPIR_cga_chunk_pool, REQi.packbuf); + } else { + MPL_free(REQi.packbuf); + } REQi.packbuf = NULL; } if (REQi.tmpbuf) { @@ -904,9 +1039,14 @@ static void clear_pending(MPII_cga_request_queue * queue, int pending_id) { #define PENDING queue->pending_blocks[pending_id] if (PENDING.persist_packbuf) { - MPIDU_genq_private_pool_free_cell(MPIR_cga_chunk_pool, PENDING.persist_packbuf); + if (MPIR_cga_chunk_pool) { + MPIDU_genq_private_pool_free_cell(MPIR_cga_chunk_pool, PENDING.persist_packbuf); + } else { + MPL_free(PENDING.persist_packbuf); + } PENDING.persist_packbuf = NULL; } + PENDING.in_staging = false; #undef PENDING } @@ -938,10 +1078,18 @@ static int clear_pending_recvs(MPII_cga_request_queue * queue, int cur_req_id, b int root = queue->requests[cur_req_id].root; int req_id = get_pending_req_id(queue, block, root); - if (req_id >= 0 && req_id != cur_req_id && queue->requests[req_id].op_type == MPII_CGA_OP_RECV) { - *flag = (queue->requests[req_id].op_stage == MPII_CGA_STAGE_NULL); - } else { - *flag = true; + while (true) { + MPIR_Assert(req_id >= 0); + if (req_id == cur_req_id) { + *flag = true; + break; + } + if (queue->requests[req_id].op_type == MPII_CGA_OP_RECV && + queue->requests[req_id].op_stage != MPII_CGA_STAGE_UNSTAGE) { + *flag = false; + break; + } + req_id = queue->requests[req_id].next_req_id; } return MPI_SUCCESS; @@ -965,12 +1113,24 @@ static int check_pending_ops(MPII_cga_request_queue * queue, int cur_req_id, boo if (req_id < 0) { req_id = queue->q_len - 1; } - /* we only need check 1 previous request since it is an all or none condition */ + /* The goal is to ensure all sends or recvs to the same rank are issued in order. + * Thus, we check that no previous send (or recv) are in a stage before issuing. + */ if (queue->requests[req_id].op_stage != MPII_CGA_STAGE_NULL && queue->requests[req_id].op_type == op_type && queue->requests[req_id].peer_rank == peer_rank) { - *flag = false; - goto fn_exit; + if (op_type == MPII_CGA_OP_SEND) { + if (queue->requests[req_id].op_stage == MPII_CGA_STAGE_START || + queue->requests[req_id].op_stage == MPII_CGA_STAGE_COPY) { + *flag = false; + goto fn_exit; + } + } else { /* MPII_CGA_OP_RECV */ + if (queue->requests[req_id].op_stage == MPII_CGA_STAGE_START) { + *flag = false; + goto fn_exit; + } + } } } @@ -1030,23 +1190,41 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) if (REQi.op_type == MPII_CGA_OP_SEND) { /* send need clear previous recvs or the data is incorrect */ TEST_PENDING(clear_pending_recvs(queue, i, &flag)); - if (queue->need_pack) { - void *pack_buf = get_persist_packbuf(queue, block, root); - if (!pack_buf) { - mpi_errno = alloc_packbuf(&pack_buf); + /* with contig staging_buf, we can skip pack_buf, but only if it's in REDUCE or rank 0 */ +#define IN_REDUCE_OR_RANK_0 (REQi.coll_type == MPII_CGA_REDUCE || queue->comm->rank == 0) +#define CONTIG_AND_IN_STAGING (queue->dt_contig && is_in_staging(queue, block, root)) + if (queue->need_pack && !(IN_REDUCE_OR_RANK_0 && CONTIG_AND_IN_STAGING)) { + void *pack_buf = NULL; + bool new_pack_buf = false; + if (REQi.coll_type == MPII_CGA_REDUCE) { + /* reduce can't reuse pack buffer */ + mpi_errno = alloc_packbuf(queue, &pack_buf); MPIR_ERR_CHECK(mpi_errno); REQi.packbuf = pack_buf; - if (queue->coll_type == MPII_CGA_BCAST) { + new_pack_buf = true; + } else { /* MPII_CGA_BCAST */ + pack_buf = get_persist_packbuf(queue, block, root); + if (!pack_buf) { + mpi_errno = alloc_packbuf(queue, &pack_buf); + MPIR_ERR_CHECK(mpi_errno); + + REQi.packbuf = pack_buf; add_persist_packbuf(queue, block, root, pack_buf); + new_pack_buf = true; + } else { + /* make sure the packbuf is loaded */ + if (!is_persist_packbuf_loaded(queue, block, root)) { + goto fn_cont; + } + REQi.packbuf = pack_buf; } - + } + if (new_pack_buf) { mpi_errno = issue_pack(queue, block, root, pack_buf, &REQi.u.async_req); MPIR_ERR_CHECK(mpi_errno); REQi.op_stage = MPII_CGA_STAGE_COPY; } else { - MPIR_Assert(queue->coll_type == MPII_CGA_BCAST); - REQi.packbuf = pack_buf; /* make sure all sends are in order */ TEST_PENDING(check_pending_ops(queue, i, &flag)); mpi_errno = issue_isend_packed(queue, block, root, REQi.peer_rank, @@ -1063,15 +1241,23 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) REQi.op_stage = MPII_CGA_STAGE_REQUEST; } } else { /* MPII_CGA_OP_RECV */ + if (REQi.coll_type == MPII_CGA_REDUCE && queue->need_staging && + !is_in_staging(queue, block, root)) { + /* issue concurrent loading of staging buffer */ + mpi_errno = issue_staging(queue, block, root, true /* is_load */ , + &REQi.staging_areq); + MPIR_ERR_CHECK(mpi_errno); + set_in_staging(queue, block, root); + } if (queue->need_pack) { /* make sure all recvs are in order */ TEST_PENDING(check_pending_ops(queue, i, &flag)); void *pack_buf; - mpi_errno = alloc_packbuf(&pack_buf); + mpi_errno = alloc_packbuf(queue, &pack_buf); MPIR_ERR_CHECK(mpi_errno); REQi.packbuf = pack_buf; - if (queue->coll_type == MPII_CGA_BCAST) { + if (REQi.coll_type == MPII_CGA_BCAST) { add_persist_packbuf(queue, block, root, pack_buf); } @@ -1100,13 +1286,16 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) if (REQi.op_type == MPII_CGA_OP_SEND) { /* make sure all sends are in order */ TEST_PENDING(check_pending_ops(queue, i, &flag)); + if (REQi.coll_type == MPII_CGA_BCAST) { + set_persist_packbuf_loaded(queue, block, root); + } mpi_errno = issue_isend_packed(queue, block, root, REQi.peer_rank, REQi.packbuf, &REQi.u.req); MPIR_ERR_CHECK(mpi_errno); REQi.issued = true; REQi.op_stage = MPII_CGA_STAGE_REQUEST; } else { - if (queue->coll_type == MPII_CGA_REDUCE) { + if (REQi.coll_type == MPII_CGA_REDUCE) { /* can't have multiple reduce into the same buffer */ TEST_PENDING(clear_pending_recvs(queue, i, &flag)); /* blocking reduce will be done in the next loop. @@ -1120,6 +1309,14 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) if (!MPIR_Request_is_complete(REQi.u.req)) { goto fn_cont; } + /* a reduction recv may issue a concurrent loading staging_buf, check before proceed */ + if (REQi.staging_areq.type != MPIR_NULL_REQUEST) { + int done; + MPIR_async_test(&REQi.staging_areq, &done); + if (!done) { + goto fn_cont; + } + } /* -- transition -- */ MPIR_Assert(REQi.u.req->status.MPI_ERROR == MPI_SUCCESS); @@ -1129,7 +1326,10 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) goto fn_complete; } else { if (queue->need_pack) { - if (queue->coll_type == MPII_CGA_REDUCE && !REQi.tmpbuf) { + if (REQi.coll_type == MPII_CGA_BCAST) { + set_persist_packbuf_loaded(queue, block, root); + } + if (REQi.coll_type == MPII_CGA_REDUCE && !REQi.tmpbuf) { /* contig (gpu) case, we can directly reduce from pack_buf */ REQi.op_stage = MPII_CGA_STAGE_REDUCE; } else { @@ -1140,7 +1340,7 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) REQi.op_stage = MPII_CGA_STAGE_COPY; } } else { - if (queue->coll_type == MPII_CGA_REDUCE) { + if (REQi.coll_type == MPII_CGA_REDUCE) { REQi.op_stage = MPII_CGA_STAGE_REDUCE; } else { goto fn_complete; @@ -1155,6 +1355,21 @@ static int progress_for_request(MPII_cga_request_queue * queue, int i) mpi_errno = reduce_local(queue, block, buf); MPIR_ERR_CHECK(mpi_errno); + if (queue->need_staging && queue->comm->rank == 0) { + mpi_errno = issue_staging(queue, block, root, false /* unstaging */ , + &REQi.u.async_req); + MPIR_ERR_CHECK(mpi_errno); + + REQi.op_stage = MPII_CGA_STAGE_UNSTAGE; + } else { + goto fn_complete; + } + } else if (REQi.op_stage == MPII_CGA_STAGE_UNSTAGE) { + int done; + MPIR_async_test(&REQi.u.async_req, &done); + if (!done) { + goto fn_cont; + } goto fn_complete; } else { MPIR_Assert(0); @@ -1231,10 +1446,10 @@ static void debug_queue(MPII_cga_request_queue * queue) /* ---- math routines ---- */ -static int calc_chunks(MPI_Aint buf_size, MPI_Aint chunk_size, int *last_msg_size_out) +static MPI_Aint calc_chunks(MPI_Aint buf_size, MPI_Aint chunk_size, MPI_Aint * last_msg_size_out) { - int n; - int last_msg_size; + MPI_Aint n; + MPI_Aint last_msg_size; /* note: bcast zero sized messages is valid */ if (chunk_size == 0 || buf_size == 0) { diff --git a/src/mpi/coll/algorithms/circ_graph/circ_graph.h b/src/mpi/coll/algorithms/circ_graph/circ_graph.h index 9ed8bcf440e..98bfbeba544 100644 --- a/src/mpi/coll/algorithms/circ_graph/circ_graph.h +++ b/src/mpi/coll/algorithms/circ_graph/circ_graph.h @@ -56,6 +56,7 @@ enum MPII_cga_op_stage { MPII_CGA_STAGE_COPY, /* waiting for the async local copy */ MPII_CGA_STAGE_REQUEST, /* waiting for the send/recv request */ MPII_CGA_STAGE_REDUCE, /* at reduce_local pending dependency */ + MPII_CGA_STAGE_UNSTAGE, /* when allreduce staging is used, rank 0 need unstage after reduce_local */ }; typedef struct { @@ -69,7 +70,8 @@ typedef struct { MPI_Aint count; MPI_Datatype datatype; MPI_Aint buf_extent; /* count * extent, needed by allgather */ - + bool dt_contig; + bool need_staging; bool need_pack; MPL_pointer_attr_t attr; @@ -80,8 +82,10 @@ typedef struct { MPI_Aint type_extent; MPI_Aint chunk_extent; /* for calc buf offset at a block */ MPI_Aint true_lb; /* adjustment for tmp_buf */ + MPI_Aint true_extent; MPI_Aint tmpbuf_size; /* (chunk_count - 1) * extent + true_extent, but 0 if not needed */ MPI_Op op; + void *staging_buf; /* if recvbuf is in GPU, use staging buf for intermediate results */ } reduce; } u; @@ -94,6 +98,8 @@ typedef struct { struct { int req_id; /* points to the index of the pending requests */ void *persist_packbuf; /* for bcast, avoid packing for every send */ + bool persist_packbuf_loaded; /* avoid sending unloaded packbuf */ + bool in_staging; /* for allreduce, whether data is in staging_buf */ } *pending_blocks; int pending_head; int pending_head_block; @@ -101,6 +107,7 @@ typedef struct { struct { enum MPII_cga_op_type op_type; + enum MPII_cga_type coll_type; enum MPII_cga_op_stage op_stage; /* sends and recvs need be issued in order. It is difficult to figure out whether * the request has been issued yet from op_stage. Use an explicit flag as shortcut. */ @@ -109,6 +116,7 @@ typedef struct { MPIR_gpu_req async_req; MPIR_Request *req; } u; + MPIR_gpu_req staging_areq; /* allreduce recv may issue concurrent loading of staging buffer */ void *packbuf; /* if need_pack, allocated chunk buffer */ void *tmpbuf; /* reduce need recv into a tmpbuf before reduce_local */ int block; @@ -137,6 +145,10 @@ int MPII_cga_init_allgather_queue(MPII_cga_request_queue * queue, int num_pendin int MPII_cga_init_reduce_queue(MPII_cga_request_queue * queue, int num_pending, void *recvbuf, MPI_Aint count, MPI_Datatype datatype, MPI_Op op, MPIR_Comm * comm, int coll_attr); +int MPII_cga_init_allreduce_queue(MPII_cga_request_queue * queue, int num_pending, + void *recvbuf, MPI_Aint count, MPI_Datatype datatype, + MPI_Op op, MPIR_Comm * comm, int coll_attr); +int MPII_cga_switch_coll_type(MPII_cga_request_queue * queue, enum MPII_cga_type coll_type); int MPII_cga_bcast_isend(MPII_cga_request_queue * queue, int block, int peer_rank, bool * flag); int MPII_cga_bcast_irecv(MPII_cga_request_queue * queue, int block, int peer_rank, bool * flag); diff --git a/src/mpi/coll/allreduce/Makefile.mk b/src/mpi/coll/allreduce/Makefile.mk index 5db420bd059..d1b81f2c874 100644 --- a/src/mpi/coll/allreduce/Makefile.mk +++ b/src/mpi/coll/allreduce/Makefile.mk @@ -18,4 +18,5 @@ mpi_core_sources += \ src/mpi/coll/allreduce/allreduce_intra_ring.c \ src/mpi/coll/allreduce/allreduce_intra_k_reduce_scatter_allgather.c \ src/mpi/coll/allreduce/allreduce_intra_ccl.c \ + src/mpi/coll/allreduce/allreduce_intra_circ_graph.c \ src/mpi/coll/allreduce/allreduce_inter_reduce_exchange_bcast.c diff --git a/src/mpi/coll/allreduce/allreduce_intra_circ_graph.c b/src/mpi/coll/allreduce/allreduce_intra_circ_graph.c new file mode 100644 index 00000000000..cceda830bc1 --- /dev/null +++ b/src/mpi/coll/allreduce/allreduce_intra_circ_graph.c @@ -0,0 +1,136 @@ +/* + * Copyright (C) by Argonne National Laboratory + * See COPYRIGHT in top-level directory + */ + +#include "mpiimpl.h" +#include "circ_graph.h" + +/* Algorithm: Circulant graph allreduce + * + * This algorithm is a combination of reduce_circ_graph + bcast_circ_graph. + * + * It is not round-efficient for small to medium messages, but can be efficient for + * large message when there are enough chunks to saturate the pipeline. + */ + +int MPIR_Allreduce_intra_circ_graph(const void *sendbuf, void *recvbuf, + MPI_Aint count, MPI_Datatype datatype, + MPI_Op op, MPIR_Comm * comm, int coll_attr) +{ + int mpi_errno = MPI_SUCCESS; + MPIR_CHKLMEM_DECL(); + + int rank, comm_size; + MPIR_COMM_RANK_SIZE(comm, rank, comm_size); + + MPIR_Assert(MPIR_Op_is_commutative(op)); + + if (sendbuf != MPI_IN_PLACE) { + mpi_errno = MPIR_Localcopy(sendbuf, count, datatype, recvbuf, count, datatype); + MPIR_ERR_CHECK(mpi_errno); + } + + /* calculate the schedule */ + MPII_circ_graph cga; + mpi_errno = MPII_circ_graph_create(&cga, comm_size, rank); + MPIR_ERR_CHECK(mpi_errno); + + /* Run schedule */ + MPII_cga_request_queue queue; + int min_pending_blocks = cga.q * 2; + mpi_errno = MPII_cga_init_allreduce_queue(&queue, min_pending_blocks, + recvbuf, count, datatype, op, comm, coll_attr); + MPIR_ERR_CHECK(mpi_errno); + + /* First run the reduce schedule */ + int n = queue.num_chunks; + int p = cga.p; + int q = cga.q; + int x = (q - ((n - 1) % q)) % q; + int offset = n - 1; + + for (int i = n + q + x - 2; i >= x; i--) { + int k = i % q; + + int send_block = cga.R[k] + offset; + if (send_block >= 0) { + int peer = (rank - cga.Skip[k] + p) % p; + if (rank != 0) { + if (send_block >= n) { + send_block = n - 1; + } + + mpi_errno = MPII_cga_reduce_send(&queue, send_block, peer); + MPIR_ERR_CHECK(mpi_errno); + } + } + + int recv_block = cga.S[k] + offset; + if (recv_block >= 0) { + int peer = (rank + cga.Skip[k]) % p; + if (peer != 0) { + if (recv_block >= n) { + recv_block = n - 1; + } + + mpi_errno = MPII_cga_reduce_recv(&queue, recv_block, peer); + MPIR_ERR_CHECK(mpi_errno); + } + } + + if (k == 0) { + offset -= q; + } + } + + MPII_cga_switch_coll_type(&queue, MPII_CGA_BCAST); + offset = -x; + + /* Then run the bcast schedule */ + for (int i = x; i < n - 1 + q + x; i++) { + int k = i % q; + + int send_block = cga.S[k] + offset; + if (send_block >= 0) { + int peer = (rank + cga.Skip[k]) % p; + if (peer != 0) { + if (send_block >= n) { + send_block = n - 1; + } + + mpi_errno = MPII_cga_bcast_send(&queue, send_block, peer); + MPIR_ERR_CHECK(mpi_errno); + } + } + + int recv_block = cga.R[k] + offset; + if (recv_block >= 0) { + int peer = (rank - cga.Skip[k] + p) % p; + if (rank != 0) { + if (recv_block >= n) { + recv_block = n - 1; + } + + mpi_errno = MPII_cga_bcast_recv(&queue, recv_block, peer); + MPIR_ERR_CHECK(mpi_errno); + } + } + + if (k == q - 1) { + offset += q; + } + } + + /* wait for all pending requests */ + mpi_errno = MPII_cga_waitall(&queue); + MPIR_ERR_CHECK(mpi_errno); + + MPII_circ_graph_free(&cga); + MPIR_CHKLMEM_FREEALL(); + + fn_exit: + return mpi_errno; + fn_fail: + goto fn_exit; +} diff --git a/src/mpi/coll/coll_algorithms.txt b/src/mpi/coll/coll_algorithms.txt index e0c9edeec60..3feeaf568ff 100644 --- a/src/mpi/coll/coll_algorithms.txt +++ b/src/mpi/coll/coll_algorithms.txt @@ -364,6 +364,8 @@ allreduce-intra: ccl extra_params: ccl cvar_params: CCL + circ_graph + restrictions: commutative allreduce-inter: reduce_exchange_bcast iallreduce-intra: diff --git a/src/mpi/coll/cvars.txt b/src/mpi/coll/cvars.txt index f8b3ab97bea..b166fff6fd8 100644 --- a/src/mpi/coll/cvars.txt +++ b/src/mpi/coll/cvars.txt @@ -1374,6 +1374,7 @@ cvars: ring - Force ring algorithm k_reduce_scatter_allgather - Force reduce scatter allgather algorithm ccl - Force CCL algorithm + circ_graph - Force circulant graph algorithm - name : MPIR_CVAR_ALLREDUCE_RECURSIVE_MULTIPLYING_KVAL category : COLLECTIVE diff --git a/src/mpi/coll/include/csel_container.h b/src/mpi/coll/include/csel_container.h index 4df1ea8b0f6..15aa846efa6 100644 --- a/src/mpi/coll/include/csel_container.h +++ b/src/mpi/coll/include/csel_container.h @@ -34,6 +34,7 @@ typedef enum { MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_ring, MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_k_reduce_scatter_allgather, MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_ccl, + MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_circ_graph, MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_inter_reduce_exchange_bcast, MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_allcomm_nb, MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Alltoall_intra_brucks, diff --git a/src/mpi/coll/src/csel_container.c b/src/mpi/coll/src/csel_container.c index 6a352782417..d53c67f8258 100644 --- a/src/mpi/coll/src/csel_container.c +++ b/src/mpi/coll/src/csel_container.c @@ -431,6 +431,8 @@ void *MPII_Create_container(struct json_object *obj) MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_k_reduce_scatter_allgather; else if (!strcmp(ckey, "algorithm=MPIR_Allreduce_intra_ccl")) cnt->id = MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_ccl; + else if (!strcmp(ckey, "algorithm=MPIR_Allreduce_intra_circ_graph")) + cnt->id = MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_circ_graph; else if (!strcmp(ckey, "algorithm=MPIR_Allreduce_inter_reduce_exchange_bcast")) cnt->id = MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_inter_reduce_exchange_bcast; diff --git a/test/mpi/coll/allred.c b/test/mpi/coll/allred.c index b44807f6696..e40b1a2be94 100644 --- a/test/mpi/coll/allred.c +++ b/test/mpi/coll/allred.c @@ -230,7 +230,7 @@ static void set_index_sum(MPI_Datatype mpi_type, int category, void *arr, int va int type_size = get_mpi_type_size(mpi_type); char *p = arr; for (int i = 0; i < count; i++) { - set_category_value(category, p, type_size, i + val); + set_category_value(category, p, type_size, (i % 10) + val); p += type_size; } } @@ -240,7 +240,7 @@ static void set_index_factor(MPI_Datatype mpi_type, int category, void *arr, int int type_size = get_mpi_type_size(mpi_type); char *p = arr; for (int i = 0; i < count; i++) { - set_category_value(category, p, type_size, i * val); + set_category_value(category, p, type_size, (i % 10) * val); p += type_size; } } @@ -259,7 +259,7 @@ static void set_index_power(MPI_Datatype mpi_type, int category, void *arr, int int type_size = get_mpi_type_size(mpi_type); char *p = arr; for (int i = 0; i < count; i++) { - set_category_value(category, p, type_size, get_pow(i, val)); + set_category_value(category, p, type_size, get_pow(i % 10, val)); p += type_size; } } diff --git a/test/mpi/maint/coll_cvars.txt b/test/mpi/maint/coll_cvars.txt index d3c4ba0bad6..86e8ed7ddf0 100644 --- a/test/mpi/maint/coll_cvars.txt +++ b/test/mpi/maint/coll_cvars.txt @@ -257,6 +257,9 @@ algorithms: .MPIR_CVAR_COLL_SHM_LIMIT_PER_NODE=131072 .MPIR_CVAR_REDUCE_INTRANODE_BUFFER_TOTAL_SIZE=16384 .MPIR_CVAR_REDUCE_INTRANODE_TREE_KVAL=4,8 + circ_graph + .MPIR_CVAR_CIRC_GRAPH_CHUNK_SIZE=0,1024,262144 + .MPIR_CVAR_CIRC_GRAPH_Q_LEN=2,8 intra-nonblocking: sched_naive sched_smp