Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 263 additions & 48 deletions src/mpi/coll/algorithms/circ_graph/cga_request_queue.c

Large diffs are not rendered by default.

14 changes: 13 additions & 1 deletion src/mpi/coll/algorithms/circ_graph/circ_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ enum MPII_cga_op_stage {
MPII_CGA_STAGE_COPY, /* waiting for the async local copy */
MPII_CGA_STAGE_REQUEST, /* waiting for the send/recv request */
MPII_CGA_STAGE_REDUCE, /* at reduce_local pending dependency */
MPII_CGA_STAGE_UNSTAGE, /* when allreduce staging is used, rank 0 need unstage after reduce_local */
};

typedef struct {
Expand All @@ -69,7 +70,8 @@ typedef struct {
MPI_Aint count;
MPI_Datatype datatype;
MPI_Aint buf_extent; /* count * extent, needed by allgather */

bool dt_contig;
bool need_staging;
bool need_pack;
MPL_pointer_attr_t attr;

Expand All @@ -80,8 +82,10 @@ typedef struct {
MPI_Aint type_extent;
MPI_Aint chunk_extent; /* for calc buf offset at a block */
MPI_Aint true_lb; /* adjustment for tmp_buf */
MPI_Aint true_extent;
MPI_Aint tmpbuf_size; /* (chunk_count - 1) * extent + true_extent, but 0 if not needed */
MPI_Op op;
void *staging_buf; /* if recvbuf is in GPU, use staging buf for intermediate results */
} reduce;
} u;

Expand All @@ -94,13 +98,16 @@ typedef struct {
struct {
int req_id; /* points to the index of the pending requests */
void *persist_packbuf; /* for bcast, avoid packing for every send */
bool persist_packbuf_loaded; /* avoid sending unloaded packbuf */
bool in_staging; /* for allreduce, whether data is in staging_buf */
} *pending_blocks;
int pending_head;
int pending_head_block;
bool inverse_order;

struct {
enum MPII_cga_op_type op_type;
enum MPII_cga_type coll_type;
enum MPII_cga_op_stage op_stage;
/* sends and recvs need be issued in order. It is difficult to figure out whether
* the request has been issued yet from op_stage. Use an explicit flag as shortcut. */
Expand All @@ -109,6 +116,7 @@ typedef struct {
MPIR_gpu_req async_req;
MPIR_Request *req;
} u;
MPIR_gpu_req staging_areq; /* allreduce recv may issue concurrent loading of staging buffer */
void *packbuf; /* if need_pack, allocated chunk buffer */
void *tmpbuf; /* reduce need recv into a tmpbuf before reduce_local */
int block;
Expand Down Expand Up @@ -137,6 +145,10 @@ int MPII_cga_init_allgather_queue(MPII_cga_request_queue * queue, int num_pendin
int MPII_cga_init_reduce_queue(MPII_cga_request_queue * queue, int num_pending,
void *recvbuf, MPI_Aint count, MPI_Datatype datatype,
MPI_Op op, MPIR_Comm * comm, int coll_attr);
int MPII_cga_init_allreduce_queue(MPII_cga_request_queue * queue, int num_pending,
void *recvbuf, MPI_Aint count, MPI_Datatype datatype,
MPI_Op op, MPIR_Comm * comm, int coll_attr);
int MPII_cga_switch_coll_type(MPII_cga_request_queue * queue, enum MPII_cga_type coll_type);

int MPII_cga_bcast_isend(MPII_cga_request_queue * queue, int block, int peer_rank, bool * flag);
int MPII_cga_bcast_irecv(MPII_cga_request_queue * queue, int block, int peer_rank, bool * flag);
Expand Down
1 change: 1 addition & 0 deletions src/mpi/coll/allreduce/Makefile.mk
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ mpi_core_sources += \
src/mpi/coll/allreduce/allreduce_intra_ring.c \
src/mpi/coll/allreduce/allreduce_intra_k_reduce_scatter_allgather.c \
src/mpi/coll/allreduce/allreduce_intra_ccl.c \
src/mpi/coll/allreduce/allreduce_intra_circ_graph.c \
src/mpi/coll/allreduce/allreduce_inter_reduce_exchange_bcast.c
136 changes: 136 additions & 0 deletions src/mpi/coll/allreduce/allreduce_intra_circ_graph.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright (C) by Argonne National Laboratory
* See COPYRIGHT in top-level directory
*/

#include "mpiimpl.h"
#include "circ_graph.h"

/* Algorithm: Circulant graph allreduce
*
* This algorithm is a combination of reduce_circ_graph + bcast_circ_graph.
*
* It is not round-efficient for small to medium messages, but can be efficient for
* large message when there are enough chunks to saturate the pipeline.
*/

int MPIR_Allreduce_intra_circ_graph(const void *sendbuf, void *recvbuf,
MPI_Aint count, MPI_Datatype datatype,
MPI_Op op, MPIR_Comm * comm, int coll_attr)
{
int mpi_errno = MPI_SUCCESS;
MPIR_CHKLMEM_DECL();

int rank, comm_size;
MPIR_COMM_RANK_SIZE(comm, rank, comm_size);

MPIR_Assert(MPIR_Op_is_commutative(op));

if (sendbuf != MPI_IN_PLACE) {
mpi_errno = MPIR_Localcopy(sendbuf, count, datatype, recvbuf, count, datatype);
MPIR_ERR_CHECK(mpi_errno);
}

/* calculate the schedule */
MPII_circ_graph cga;
mpi_errno = MPII_circ_graph_create(&cga, comm_size, rank);
MPIR_ERR_CHECK(mpi_errno);

/* Run schedule */
MPII_cga_request_queue queue;
int min_pending_blocks = cga.q * 2;
mpi_errno = MPII_cga_init_allreduce_queue(&queue, min_pending_blocks,
recvbuf, count, datatype, op, comm, coll_attr);
MPIR_ERR_CHECK(mpi_errno);

/* First run the reduce schedule */
int n = queue.num_chunks;
int p = cga.p;
int q = cga.q;
int x = (q - ((n - 1) % q)) % q;
int offset = n - 1;

for (int i = n + q + x - 2; i >= x; i--) {
int k = i % q;

int send_block = cga.R[k] + offset;
if (send_block >= 0) {
int peer = (rank - cga.Skip[k] + p) % p;
if (rank != 0) {
if (send_block >= n) {
send_block = n - 1;
}

mpi_errno = MPII_cga_reduce_send(&queue, send_block, peer);
MPIR_ERR_CHECK(mpi_errno);
}
}

int recv_block = cga.S[k] + offset;
if (recv_block >= 0) {
int peer = (rank + cga.Skip[k]) % p;
if (peer != 0) {
if (recv_block >= n) {
recv_block = n - 1;
}

mpi_errno = MPII_cga_reduce_recv(&queue, recv_block, peer);
MPIR_ERR_CHECK(mpi_errno);
}
}

if (k == 0) {
offset -= q;
}
}

MPII_cga_switch_coll_type(&queue, MPII_CGA_BCAST);
offset = -x;

/* Then run the bcast schedule */
for (int i = x; i < n - 1 + q + x; i++) {
int k = i % q;

int send_block = cga.S[k] + offset;
if (send_block >= 0) {
int peer = (rank + cga.Skip[k]) % p;
if (peer != 0) {
if (send_block >= n) {
send_block = n - 1;
}

mpi_errno = MPII_cga_bcast_send(&queue, send_block, peer);
MPIR_ERR_CHECK(mpi_errno);
}
}

int recv_block = cga.R[k] + offset;
if (recv_block >= 0) {
int peer = (rank - cga.Skip[k] + p) % p;
if (rank != 0) {
if (recv_block >= n) {
recv_block = n - 1;
}

mpi_errno = MPII_cga_bcast_recv(&queue, recv_block, peer);
MPIR_ERR_CHECK(mpi_errno);
}
}

if (k == q - 1) {
offset += q;
}
}

/* wait for all pending requests */
mpi_errno = MPII_cga_waitall(&queue);
MPIR_ERR_CHECK(mpi_errno);

MPII_circ_graph_free(&cga);
MPIR_CHKLMEM_FREEALL();

fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
2 changes: 2 additions & 0 deletions src/mpi/coll/coll_algorithms.txt
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ allreduce-intra:
ccl
extra_params: ccl
cvar_params: CCL
circ_graph
restrictions: commutative
allreduce-inter:
reduce_exchange_bcast
iallreduce-intra:
Expand Down
1 change: 1 addition & 0 deletions src/mpi/coll/cvars.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1374,6 +1374,7 @@ cvars:
ring - Force ring algorithm
k_reduce_scatter_allgather - Force reduce scatter allgather algorithm
ccl - Force CCL algorithm
circ_graph - Force circulant graph algorithm

- name : MPIR_CVAR_ALLREDUCE_RECURSIVE_MULTIPLYING_KVAL
category : COLLECTIVE
Expand Down
1 change: 1 addition & 0 deletions src/mpi/coll/include/csel_container.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ typedef enum {
MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_ring,
MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_k_reduce_scatter_allgather,
MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_ccl,
MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_circ_graph,
MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_inter_reduce_exchange_bcast,
MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_allcomm_nb,
MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Alltoall_intra_brucks,
Expand Down
2 changes: 2 additions & 0 deletions src/mpi/coll/src/csel_container.c
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ void *MPII_Create_container(struct json_object *obj)
MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_k_reduce_scatter_allgather;
else if (!strcmp(ckey, "algorithm=MPIR_Allreduce_intra_ccl"))
cnt->id = MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_ccl;
else if (!strcmp(ckey, "algorithm=MPIR_Allreduce_intra_circ_graph"))
cnt->id = MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_intra_circ_graph;
else if (!strcmp(ckey, "algorithm=MPIR_Allreduce_inter_reduce_exchange_bcast"))
cnt->id =
MPII_CSEL_CONTAINER_TYPE__ALGORITHM__MPIR_Allreduce_inter_reduce_exchange_bcast;
Expand Down
6 changes: 3 additions & 3 deletions test/mpi/coll/allred.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ static void set_index_sum(MPI_Datatype mpi_type, int category, void *arr, int va
int type_size = get_mpi_type_size(mpi_type);
char *p = arr;
for (int i = 0; i < count; i++) {
set_category_value(category, p, type_size, i + val);
set_category_value(category, p, type_size, (i % 10) + val);
p += type_size;
}
}
Expand All @@ -240,7 +240,7 @@ static void set_index_factor(MPI_Datatype mpi_type, int category, void *arr, int
int type_size = get_mpi_type_size(mpi_type);
char *p = arr;
for (int i = 0; i < count; i++) {
set_category_value(category, p, type_size, i * val);
set_category_value(category, p, type_size, (i % 10) * val);
p += type_size;
}
}
Expand All @@ -259,7 +259,7 @@ static void set_index_power(MPI_Datatype mpi_type, int category, void *arr, int
int type_size = get_mpi_type_size(mpi_type);
char *p = arr;
for (int i = 0; i < count; i++) {
set_category_value(category, p, type_size, get_pow(i, val));
set_category_value(category, p, type_size, get_pow(i % 10, val));
p += type_size;
}
}
Expand Down
3 changes: 3 additions & 0 deletions test/mpi/maint/coll_cvars.txt
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ algorithms:
.MPIR_CVAR_COLL_SHM_LIMIT_PER_NODE=131072
.MPIR_CVAR_REDUCE_INTRANODE_BUFFER_TOTAL_SIZE=16384
.MPIR_CVAR_REDUCE_INTRANODE_TREE_KVAL=4,8
circ_graph
.MPIR_CVAR_CIRC_GRAPH_CHUNK_SIZE=0,1024,262144
.MPIR_CVAR_CIRC_GRAPH_Q_LEN=2,8
intra-nonblocking:
sched_naive
sched_smp
Expand Down