diff --git a/fabtests/prov/efa/Makefile.include b/fabtests/prov/efa/Makefile.include index 42bfabca7c5..e4aae2d8169 100644 --- a/fabtests/prov/efa/Makefile.include +++ b/fabtests/prov/efa/Makefile.include @@ -40,7 +40,8 @@ bin_PROGRAMS += prov/efa/src/fi_efa_rnr_read_cq_error \ prov/efa/src/fi_efa_multi_ep_stress \ prov/efa/src/fi_efa_mmap_test \ prov/efa/src/fi_efa_mr_test \ - prov/efa/src/fi_efa_runt_read_no_handshake + prov/efa/src/fi_efa_runt_read_no_handshake \ + prov/efa/src/fi_efa_rma_bw if HAVE_VERBS_DEVEL if HAVE_EFA_DV @@ -107,6 +108,11 @@ prov_efa_src_fi_efa_runt_read_no_handshake_SOURCES = \ $(benchmarks_srcs) prov_efa_src_fi_efa_runt_read_no_handshake_LDADD = libfabtests.la +prov_efa_src_fi_efa_rma_bw_SOURCES = \ + prov/efa/src/efa_rma_bw.c \ + $(benchmarks_srcs) +prov_efa_src_fi_efa_rma_bw_LDADD = libfabtests.la + if HAVE_VERBS_DEVEL if HAVE_EFA_DV efa_exhaust_mr_reg_srcs = \ diff --git a/fabtests/prov/efa/src/efa_rma_bw.c b/fabtests/prov/efa/src/efa_rma_bw.c new file mode 100644 index 00000000000..a7b56716364 --- /dev/null +++ b/fabtests/prov/efa/src/efa_rma_bw.c @@ -0,0 +1,369 @@ +/* SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0-only */ +/* SPDX-FileCopyrightText: Copyright Amazon.com, Inc. or its affiliates. All rights reserved. */ + +/* + * EFA-specific RMA bandwidth test. + * + * This test measures RMA bandwidth with support for EFA-specific features + * such as the FI_EFA_WR_HIGH_PPS flag. It currently supports write, + * writedata, and read operations. + * + * Unlike fi_rma_bw, this test uses a nonblocking benchmark loop that + * interleaves posting and completion polling to keep the pipeline full, + * similar to the approach used by rdma-core/perftest. This avoids blocking + * at window boundaries and maximizes throughput. + * + * Usage: + * Server: fi_efa_rma_bw + * Client: fi_efa_rma_bw -H + * + * Options: + * --high-pps Enable FI_EFA_WR_HIGH_PPS flag on writes. + * -o write|writedata|read Select RMA operation (default: write). + */ + +#include +#include +#include +#include + +#include +#include +#include + +#include +#include "benchmarks/benchmark_shared.h" + + +#define EFA_RMA_BW_CQ_POLL_BATCH 16 + +static int use_high_pps; +static int post_list = 1; + +static ssize_t post_rma(char *buf, size_t size, + struct fi_rma_iov *remote, void *context, + uint64_t base_flags) +{ + struct fi_msg_rma msg; + struct iovec msg_iov; + struct fi_rma_iov rma_iov; + uint64_t flags = base_flags; + ssize_t ret; + + msg_iov.iov_base = buf; + msg_iov.iov_len = size; + msg.msg_iov = &msg_iov; + msg.desc = &mr_desc; + msg.iov_count = 1; + rma_iov.addr = remote->addr + (buf - (opts.rma_op == FT_RMA_READ ? rx_buf : tx_buf)); + rma_iov.len = size; + rma_iov.key = remote->key; + msg.rma_iov = &rma_iov; + msg.rma_iov_count = 1; + msg.addr = remote_fi_addr; + msg.context = context; + + if (opts.rma_op == FT_RMA_READ) { + msg.data = 0; + ret = fi_readmsg(ep, &msg, flags); + } else { + if (use_high_pps) + flags |= FI_EFA_WR_HIGH_PPS; + if (opts.rma_op == FT_RMA_WRITEDATA) { + flags |= FI_REMOTE_CQ_DATA; + msg.data = remote_cq_data; + } else { + msg.data = 0; + } + ret = fi_writemsg(ep, &msg, flags); + } + + return ret; +} + +static int bw_comp_nonblocking(struct fid_cq *cq, uint64_t *cq_cntr, + int *completed_cnt) +{ + int ret, cnt = 0; + struct fi_cq_data_entry comp[EFA_RMA_BW_CQ_POLL_BATCH]; + + while ((ret = fi_cq_read(cq, comp, EFA_RMA_BW_CQ_POLL_BATCH)) > 0) { + (*completed_cnt) += ret; + (*cq_cntr) += ret; + cnt += ret; + } + + if (ret == -FI_EAVAIL) { + ret = ft_cq_readerr(cq); + return ret; + } + + if (ret < 0 && ret != -FI_EAGAIN) { + FT_PRINTERR("fi_cq_read", ret); + return ret; + } + + return cnt; +} + +static int bandwidth_rma_efa(struct fi_rma_iov *remote) +{ + int ret, posted_cnt = 0, completed_cnt = 0; + size_t offset; + size_t rma_start_offset; + int total_iterations = opts.iterations + opts.warmup_iterations; + bool warmup_done = false; + char *buf; + uint64_t flags; + + ret = ft_sync(); + if (ret) + return ret; + + rma_start_offset = FT_RMA_SYNC_MSG_BYTES + + MAX(ft_tx_prefix_size(), ft_rx_prefix_size()); + + if (opts.rma_op == FT_RMA_WRITEDATA && !opts.dst_addr) { + /* Server side for writedata: pre-post all rx buffers up to + * window_size before the loop starts, matching perftest behavior. + */ + if (fi->rx_attr->mode & FI_RX_CQ_DATA) { + for (posted_cnt = 0; posted_cnt < opts.window_size && + posted_cnt < total_iterations; posted_cnt++) { + ret = ft_post_rx(ep, 0, + &rx_ctx_arr[posted_cnt % + opts.window_size].context); + if (ret) + return ret; + } + } + + /* Poll rxcq for completions, reposting as they complete. */ + while (completed_cnt < total_iterations) { + if (!warmup_done && + completed_cnt >= opts.warmup_iterations) { + ft_start(); + warmup_done = true; + } + ret = bw_comp_nonblocking(rxcq, &rx_cq_cntr, + &completed_cnt); + if (ret < 0) + return ret; + if (fi->rx_attr->mode & FI_RX_CQ_DATA) { + int i; + for (i = 0; i < ret && + posted_cnt < total_iterations; i++) { + int err = ft_post_rx(ep, 0, + &rx_ctx_arr[posted_cnt % + opts.window_size].context); + if (err) + return err; + posted_cnt++; + } + } + } + } else { + /* Initiator side: post RMA ops and poll completions */ + while (posted_cnt < total_iterations || + completed_cnt < total_iterations) { + if (!warmup_done && + completed_cnt >= opts.warmup_iterations) { + ft_start(); + warmup_done = true; + } + + while (posted_cnt < total_iterations && + (posted_cnt - completed_cnt) < + opts.window_size) { + offset = rma_start_offset + + (posted_cnt % opts.window_size) * + opts.transfer_size; + + buf = (opts.rma_op == FT_RMA_READ) ? + rx_buf + offset : tx_buf + offset; + + flags = (post_list > 1 && + (posted_cnt + 1) % post_list && + posted_cnt + 1 < total_iterations) ? + FI_MORE : 0; + + ret = post_rma(buf, + opts.transfer_size, remote, + &tx_ctx_arr[posted_cnt % + opts.window_size].context, + flags); + if (ret == -FI_EAGAIN) + break; + if (ret) + return ret; + posted_cnt++; + } + + ret = bw_comp_nonblocking(txcq, &tx_cq_cntr, + &completed_cnt); + if (ret < 0) + return ret; + } + } + + ft_stop(); + if (opts.machr) + show_perf_mr(opts.transfer_size, opts.iterations, &start, &end, + 1, opts.argc, opts.argv); + else + show_perf(NULL, opts.transfer_size, opts.iterations, &start, + &end, 1); + + return 0; +} + +static int run(void) +{ + int i, ret; + + ret = ft_init_fabric(); + if (ret) + return ret; + + ret = ft_exchange_keys(&remote); + if (ret) + return ret; + + if (!(opts.options & FT_OPT_SIZE)) { + for (i = 0; i < TEST_CNT; i++) { + if (!ft_use_size(i, opts.sizes_enabled)) + continue; + opts.transfer_size = test_size[i].size; + init_test(&opts, test_name, sizeof(test_name)); + ret = bandwidth_rma_efa(&remote); + if (ret) + goto out; + } + } else { + init_test(&opts, test_name, sizeof(test_name)); + ret = bandwidth_rma_efa(&remote); + if (ret) + goto out; + } + + ft_finalize(); +out: + return ret; +} + +enum { + OPT_HIGH_PPS = 256, + OPT_POST_LIST, +}; + +static struct option efa_extra_opts[] = { + {"high-pps", no_argument, NULL, OPT_HIGH_PPS}, + {"post-list", required_argument, NULL, OPT_POST_LIST}, + {0, 0, 0, 0} +}; + +static struct option *efa_long_opts; + +/* + * Build a merged long options table by prepending EFA-specific options + * to the shared fabtests long_opts. This allows getopt_long to parse + * both EFA-specific (e.g. --high-pps) and shared (e.g. --no-rx-cq-data) + * long options in a single call. + */ +static void build_long_opts(void) +{ + int shared_cnt, i; + int extra_cnt = sizeof(efa_extra_opts) / sizeof(efa_extra_opts[0]) - 1; + + for (shared_cnt = 0; long_opts[shared_cnt].name; shared_cnt++) + ; + efa_long_opts = calloc(shared_cnt + extra_cnt + 1, sizeof(struct option)); + for (i = 0; i < extra_cnt; i++) + efa_long_opts[i] = efa_extra_opts[i]; + for (i = 0; i < shared_cnt; i++) + efa_long_opts[extra_cnt + i] = long_opts[i]; +} + +int main(int argc, char **argv) +{ + int op, ret, cleanup_ret; + + opts = INIT_OPTS; + opts.options |= FT_OPT_BW; + opts.rma_op = FT_RMA_WRITE; + + hints = fi_allocinfo(); + if (!hints) + return EXIT_FAILURE; + + hints->caps = FI_MSG | FI_RMA; + hints->domain_attr->resource_mgmt = FI_RM_ENABLED; + hints->mode = FI_CONTEXT | FI_CONTEXT2; + hints->domain_attr->threading = FI_THREAD_DOMAIN; + hints->addr_format = opts.address_format; + + build_long_opts(); + + while ((op = getopt_long(argc, argv, "h" CS_OPTS INFO_OPTS API_OPTS + BENCHMARK_OPTS, efa_long_opts, + &lopt_idx)) != -1) { + switch (op) { + case OPT_HIGH_PPS: + use_high_pps = 1; + break; + case OPT_POST_LIST: + post_list = atoi(optarg); + break; + case '?': + case 'h': + ft_csusage(argv[0], + "EFA RMA bandwidth test."); + ft_benchmark_usage(); + FT_PRINT_OPTS_USAGE("-o ", + "RMA op type: write|writedata|read (default: write)"); + FT_PRINT_OPTS_USAGE("--high-pps", + "Enable FI_EFA_WR_HIGH_PPS flag on writes"); + FT_PRINT_OPTS_USAGE("--post-list ", + "Batch n posts per doorbell using FI_MORE (default: 1)"); + fprintf(stderr, "Note: read/write bw tests are bidirectional.\n" + " writedata bw test is unidirectional" + " from the client side.\n"); + ft_longopts_usage(); + return EXIT_FAILURE; + default: + if (!ft_parse_long_opts(op, optarg)) + continue; + ft_parse_benchmark_opts(op, optarg); + ft_parse_api_opts(op, optarg, hints, &opts); + ft_parseinfo(op, optarg, hints, &opts); + ft_parsecsopts(op, optarg, &opts); + break; + } + } + + if (optind < argc) + opts.dst_addr = argv[optind]; + + hints->domain_attr->mr_mode = opts.mr_mode; + hints->tx_attr->tclass = FI_TC_BULK_DATA; + /* Using OOB sync to not mess up with the tx/rx seq cntrs in fabtests common code */ + opts.options |= FT_OPT_OOB_SYNC; + + const char *op_str = "WRITE"; + if (opts.rma_op == FT_RMA_WRITEDATA) + op_str = "WRITEDATA"; + else if (opts.rma_op == FT_RMA_READ) + op_str = "READ"; + + if (use_high_pps) + printf("High PPS mode: ENABLED\n"); + else + printf("High PPS mode: DISABLED\n"); + + printf("RMA op: %s\n", op_str); + + ret = run(); + + cleanup_ret = ft_free_res(); + return -(ret ? ret : cleanup_ret); +} diff --git a/fabtests/pytest/efa/test_rma_bw.py b/fabtests/pytest/efa/test_rma_bw.py index 28168dd06ac..c7325add077 100644 --- a/fabtests/pytest/efa/test_rma_bw.py +++ b/fabtests/pytest/efa/test_rma_bw.py @@ -141,3 +141,23 @@ def test_rma_bw_sread(cmdline_args, rma_operation_type, rma_bw_completion_semant efa_run_client_server_test(cmdline_args, command, "short", rma_bw_completion_semantic, rma_bw_memory_type, message_sizes, timeout=timeout, fabric=rma_fabric, additional_env=additional_env) + + +@pytest.mark.fabric(params=["efa", "efa-direct"]) +@pytest.mark.message_sizes(default_efa=PERF_SIZES, default_efa_direct=DIRECT_RMA_SIZES, + pr_ci_efa=PERF_PR_CI, pr_ci_efa_direct=DIRECT_RMA_SIZES) +@pytest.mark.functional +@pytest.mark.parametrize("operation_type", ["write", "writedata"]) +# Only test host and cuda memory; other HMEM types do not change the RMA path. +@pytest.mark.parametrize("mem_type", + ["host_to_host", + pytest.param("cuda_to_cuda", marks=pytest.mark.cuda_memory)]) +def test_efa_rma_bw_high_pps(cmdline_args, operation_type, mem_type, rma_fabric): + command = "fi_efa_rma_bw -e rdm --high-pps" + command += " -o " + operation_type + efa_run_client_server_test(cmdline_args, command, "short", + completion_semantic="transmit_complete", + memory_type=mem_type, + message_size="all", + fabric=rma_fabric, + additional_env="FI_EFA_ENABLE_HIGH_PPS=1 FI_EFA_ENABLE_SHM_TRANSFER=0") diff --git a/man/fi_efa.7.md b/man/fi_efa.7.md index 57ca2b276ee..553d7f98b69 100644 --- a/man/fi_efa.7.md +++ b/man/fi_efa.7.md @@ -190,6 +190,16 @@ provider for AWS Neuron or Habana SynapseAI. For efa-direct, FI_RX_CQ_DATA is required when FI_OPT_EFA_USE_UNSOLICITED_WRITE_RECV is false, or it will return -FI_EOPNOTSUPP for the call to fi_setopt(). +# PROVIDER SPECIFIC OPERATION FLAGS + +The EFA provider defines provider-specific operation flags that can be passed +in the `flags` argument of data transfer calls such as `fi_writemsg()`. + +*FI_EFA_WR_HIGH_PPS* +: This flag can be passed in the `flags` argument of RDMA write operations + (e.g., `fi_writemsg()`) to hint the device to optimize for higher message + rate. + # PROVIDER SPECIFIC OPERATION EXTENSIONS The efa provider exports extensions for operations that are not provided by the standard libfabric interface. These extensions are available via diff --git a/prov/efa/configure.m4 b/prov/efa/configure.m4 index 260a6358573..1059edee9e6 100644 --- a/prov/efa/configure.m4 +++ b/prov/efa/configure.m4 @@ -89,6 +89,7 @@ AC_DEFUN([FI_EFA_CONFIGURE],[ have_efadv_query_qp_wqs=0 have_efadv_query_cq=0 have_efadv_cq_attr_db=0 + have_efadv_wr_processing_hints=0 have_ibv_create_comp_channel=0 have_ibv_get_cq_event=0 @@ -189,6 +190,36 @@ AC_DEFUN([FI_EFA_CONFIGURE],[ [have_efadv_sl=0], [[#include ]]) + have_efadv_qp_from_ibv_qp_ex=0 + AC_CHECK_DECL([efadv_qp_from_ibv_qp_ex], + [have_efadv_qp_from_ibv_qp_ex=1], + [], + [[#include ]]) + + have_efadv_qp_init_attr_wr_flags=0 + AC_CHECK_MEMBER(struct efadv_qp_init_attr.wr_flags, + [have_efadv_qp_init_attr_wr_flags=1], + [], + [[#include ]]) + + have_efadv_wr_hint_burst_pps=0 + AC_CHECK_DECL([EFADV_WR_PROCESSING_HINT_BURST_PPS_SENSITIVE], + [have_efadv_wr_hint_burst_pps=1], + [], + [[#include ]]) + + have_efadv_wr_set_processing_hints=0 + AC_CHECK_DECL([efadv_wr_set_processing_hints], + [have_efadv_wr_set_processing_hints=1], + [], + [[#include ]]) + + AS_IF([test "$have_efadv_qp_from_ibv_qp_ex" = "1" -a \ + "$have_efadv_qp_init_attr_wr_flags" = "1" -a \ + "$have_efadv_wr_hint_burst_pps" = "1" -a \ + "$have_efadv_wr_set_processing_hints" = "1"], + [have_efadv_wr_processing_hints=1]) + have_efadv_query_qp_wqs=1 AC_CHECK_DECL([efadv_query_qp_wqs], [], @@ -254,6 +285,9 @@ AC_DEFUN([FI_EFA_CONFIGURE],[ AC_DEFINE_UNQUOTED([HAVE_EFADV_SL], [$have_efadv_sl], [Indicates if efadv_qp_init_attr has sl]) + AC_DEFINE_UNQUOTED([HAVE_EFADV_WR_PROCESSING_HINTS], + [$have_efadv_wr_processing_hints], + [Indicates if efadv_qp_from_ibv_qp_ex and WQE processing hint support is available]) AC_DEFINE_UNQUOTED([HAVE_EFADV_QUERY_QP_WQS], [$have_efadv_query_qp_wqs], [Indicates if efadv_query_qp_wqs is available]) diff --git a/prov/efa/src/efa_base_ep.c b/prov/efa/src/efa_base_ep.c index 62657220848..b1a8ee87106 100644 --- a/prov/efa/src/efa_base_ep.c +++ b/prov/efa/src/efa_base_ep.c @@ -301,6 +301,9 @@ int efa_qp_create(struct efa_qp **qp, struct ibv_qp_init_attr_ex *init_attr_ex, efa_attr.sl = EFA_QP_DEFAULT_SERVICE_LEVEL; if (tclass == FI_TC_LOW_LATENCY) efa_attr.sl = EFA_QP_LOW_LATENCY_SERVICE_LEVEL; +#endif +#if HAVE_EFADV_WR_PROCESSING_HINTS + efa_attr.wr_flags |= EFADV_WR_EX_WITH_PROCESSING_HINTS; #endif (*qp)->ibv_qp = efadv_create_qp_ex( init_attr_ex->pd->context, init_attr_ex, &efa_attr, diff --git a/prov/efa/src/efa_data_path_direct_entry.h b/prov/efa/src/efa_data_path_direct_entry.h index ea70155478d..c781d7e1629 100644 --- a/prov/efa/src/efa_data_path_direct_entry.h +++ b/prov/efa/src/efa_data_path_direct_entry.h @@ -642,6 +642,10 @@ efa_data_path_direct_post_write( efa_send_wr_set_imm_data(meta_desc, data); } + + if (efa_env.enable_high_pps && (flags & FI_EFA_WR_HIGH_PPS)) + efa_send_wr_set_processing_hint_high_pps(meta_desc); + /* Set remote memory information */ efa_send_wr_set_rdma_addr(remote_mem, remote_key, remote_addr); remote_mem->length = efa_sge_total_bytes(sge_list, sge_count); diff --git a/prov/efa/src/efa_data_path_direct_internal.h b/prov/efa/src/efa_data_path_direct_internal.h index 44f1c40088f..d7fb3b051bd 100644 --- a/prov/efa/src/efa_data_path_direct_internal.h +++ b/prov/efa/src/efa_data_path_direct_internal.h @@ -581,6 +581,12 @@ EFA_ALWAYS_INLINE void efa_send_wr_set_imm_data(struct efa_io_tx_meta_desc *meta EFA_SET(&meta_desc->ctrl1, EFA_IO_TX_META_DESC_HAS_IMM, 1); } +EFA_ALWAYS_INLINE void efa_send_wr_set_processing_hint_high_pps(struct efa_io_tx_meta_desc *meta_desc) +{ + EFA_SET(&meta_desc->ctrl3, EFA_IO_TX_META_DESC_PROCESSING_HINTS, 1); +} + + EFA_ALWAYS_INLINE void efa_send_wr_set_rdma_addr(struct efa_io_remote_mem_addr *remote_mem, uint32_t rkey, uint64_t remote_addr) diff --git a/prov/efa/src/efa_data_path_ops.h b/prov/efa/src/efa_data_path_ops.h index 3b4157f88f5..26411879fc2 100644 --- a/prov/efa/src/efa_data_path_ops.h +++ b/prov/efa/src/efa_data_path_ops.h @@ -155,6 +155,12 @@ efa_ibv_post_write( ibv_wr_set_sge_list(qp->ibv_qp_ex, sge_count, sge_list); ibv_wr_set_ud_addr(qp->ibv_qp_ex, ah->ibv_ah, qpn, qkey); +#if HAVE_EFADV_WR_PROCESSING_HINTS + if (efa_env.enable_high_pps && (flags & FI_EFA_WR_HIGH_PPS)) + efadv_wr_set_processing_hints(efadv_qp_from_ibv_qp_ex(qp->ibv_qp_ex), + EFADV_WR_PROCESSING_HINT_BURST_PPS_SENSITIVE); +#endif + if (!(flags & FI_MORE)) { ret = ibv_wr_complete(qp->ibv_qp_ex); base_ep->is_wr_started = false; diff --git a/prov/efa/src/efa_env.c b/prov/efa/src/efa_env.c index 3d6849313ed..6620e1011af 100644 --- a/prov/efa/src/efa_env.c +++ b/prov/efa/src/efa_env.c @@ -41,6 +41,7 @@ struct efa_env efa_env = { .use_data_path_direct = true, .implicit_av_size = 0, .track_mr = 0, + .enable_high_pps = 0, }; /* @brief Read and store the FI_EFA_* environment variables. @@ -141,6 +142,11 @@ void efa_env_param_get(void) fi_param_get_bool(&efa_prov, "use_data_path_direct", &efa_env.use_data_path_direct); fi_param_get_bool(&efa_prov, "track_mr", &efa_env.track_mr); + /* Read enable_high_pps directly from environment variable to avoid showing in fi_info -e */ + char *high_pps_env = getenv("FI_EFA_ENABLE_HIGH_PPS"); + if (high_pps_env) + efa_env.enable_high_pps = atoi(high_pps_env); + efa_fork_support_request_initialize(); } diff --git a/prov/efa/src/efa_env.h b/prov/efa/src/efa_env.h index 5d26387f43f..ee108bbb811 100644 --- a/prov/efa/src/efa_env.h +++ b/prov/efa/src/efa_env.h @@ -79,6 +79,11 @@ struct efa_env { * operations still reference an MR when it is closed. */ int track_mr; + /** + * Enable high PPS (packets per second) optimization hints. + * This feature allows applications to provide hints for burst PPS sensitive workloads. + */ + int enable_high_pps; }; extern struct efa_env efa_env; diff --git a/prov/efa/src/efa_io_defs.h b/prov/efa/src/efa_io_defs.h index efbc0578d4a..fda9934b73f 100644 --- a/prov/efa/src/efa_io_defs.h +++ b/prov/efa/src/efa_io_defs.h @@ -78,7 +78,14 @@ struct efa_io_tx_meta_desc { uint16_t ah; - uint16_t reserved; + /* + * control flags + * 1:0 : processing_hints - enum efa_io_processing_hint + * 7:2 : reserved - MBZ + */ + uint8_t ctrl3; + + uint8_t reserved; /* Queue key */ uint32_t qkey; @@ -401,6 +408,7 @@ struct efa_io_rx_cdesc_ex { #define EFA_IO_TX_META_DESC_FIRST_MASK BIT(2) #define EFA_IO_TX_META_DESC_LAST_MASK BIT(3) #define EFA_IO_TX_META_DESC_COMP_REQ_MASK BIT(4) +#define EFA_IO_TX_META_DESC_PROCESSING_HINTS_MASK GENMASK(1, 0) /* tx_buf_desc */ #define EFA_IO_TX_BUF_DESC_LKEY_MASK GENMASK(23, 0) diff --git a/prov/efa/src/fi_ext_efa.h b/prov/efa/src/fi_ext_efa.h index 5a804083f2b..e725aa44730 100644 --- a/prov/efa/src/fi_ext_efa.h +++ b/prov/efa/src/fi_ext_efa.h @@ -88,4 +88,17 @@ struct fi_efa_feature_ops { bool (*query)(const char *feature); }; + +/** + * EFA provider specific op flags (60 - 63 bits) + * See rdma/fabric.h for 0-59 bit that apply to all providers + */ + + /* + * Hint the device to optimize for higher message rate for rdma operations. + * This flag can be passed in the 'flags' argument of data transfer calls + * such as fi_writemsg(). + */ +#define FI_EFA_WR_HIGH_PPS (1ULL << 60) + #endif /* _FI_EXT_EFA_H_ */ diff --git a/prov/efa/test/efa_unit_test_data_path_direct.c b/prov/efa/test/efa_unit_test_data_path_direct.c index b74406dc119..fa988ba9d77 100644 --- a/prov/efa/test/efa_unit_test_data_path_direct.c +++ b/prov/efa/test/efa_unit_test_data_path_direct.c @@ -233,3 +233,22 @@ void test_efa_data_path_direct_qp_gen_increments_across_qps(struct efa_resource skip(); #endif } + +/** + * @brief Test that efa_send_wr_set_processing_hint_high_pps sets the processing + * hint bits in ctrl3 of the TX WQE metadata descriptor. + */ +void test_efa_data_path_direct_write_high_pps_hint_set(struct efa_resource **state) +{ +#if HAVE_EFA_DATA_PATH_DIRECT + struct efa_io_tx_meta_desc meta_desc = {0}; + + assert_true(EFA_GET(&meta_desc.ctrl3, EFA_IO_TX_META_DESC_PROCESSING_HINTS) == 0); + + efa_send_wr_set_processing_hint_high_pps(&meta_desc); + + assert_true(EFA_GET(&meta_desc.ctrl3, EFA_IO_TX_META_DESC_PROCESSING_HINTS) != 0); +#else + skip(); +#endif +} diff --git a/prov/efa/test/efa_unit_test_mocks.c b/prov/efa/test/efa_unit_test_mocks.c index f7417be3d18..26e0afe5c15 100644 --- a/prov/efa/test/efa_unit_test_mocks.c +++ b/prov/efa/test/efa_unit_test_mocks.c @@ -789,3 +789,39 @@ void *__wrap_calloc(size_t nmemb, size_t size) return __real_calloc(nmemb, size); } + +void efa_mock_ibv_wr_start_no_op(struct ibv_qp_ex *qp) +{ +} + +void efa_mock_ibv_wr_rdma_write_save_wr(struct ibv_qp_ex *qp, uint32_t rkey, + uint64_t remote_addr) +{ + g_ibv_submitted_wr_id_vec[g_ibv_submitted_wr_id_cnt] = (void *)qp->wr_id; + g_ibv_submitted_wr_id_cnt++; +} + +void efa_mock_ibv_wr_set_sge_list_no_op(struct ibv_qp_ex *qp, + size_t num_sge, + const struct ibv_sge *sge_list) +{ +} + +void efa_mock_ibv_wr_set_ud_addr_no_op(struct ibv_qp_ex *qp, struct ibv_ah *ah, + uint32_t remote_qpn, uint32_t remote_qkey) +{ +} + +int efa_mock_ibv_wr_complete_no_op(struct ibv_qp_ex *qp) +{ + return 0; +} + +#if HAVE_EFADV_WR_PROCESSING_HINTS +void efa_mock_efadv_wr_set_processing_hints(struct efadv_qp *efadv_qp, + uint32_t hints) +{ + function_called(); + check_expected(hints); +} +#endif diff --git a/prov/efa/test/efa_unit_test_mocks.h b/prov/efa/test/efa_unit_test_mocks.h index d5ea4ce773c..0065c3ecf23 100644 --- a/prov/efa/test/efa_unit_test_mocks.h +++ b/prov/efa/test/efa_unit_test_mocks.h @@ -154,6 +154,20 @@ void efa_mock_ibv_wr_rdma_read_save_wr(struct ibv_qp_ex *qp, uint32_t rkey, void efa_mock_ibv_wr_rdma_write_imm_save_wr(struct ibv_qp_ex *qp, uint32_t rkey, uint64_t remote_addr, __be32 imm_data); +void efa_mock_ibv_wr_start_no_op(struct ibv_qp_ex *qp); +void efa_mock_ibv_wr_rdma_write_save_wr(struct ibv_qp_ex *qp, uint32_t rkey, + uint64_t remote_addr); +void efa_mock_ibv_wr_set_sge_list_no_op(struct ibv_qp_ex *qp, + size_t num_sge, + const struct ibv_sge *sge_list); +void efa_mock_ibv_wr_set_ud_addr_no_op(struct ibv_qp_ex *qp, struct ibv_ah *ah, + uint32_t remote_qpn, uint32_t remote_qkey); +int efa_mock_ibv_wr_complete_no_op(struct ibv_qp_ex *qp); + +#if HAVE_EFADV_WR_PROCESSING_HINTS +void efa_mock_efadv_wr_set_processing_hints(struct efadv_qp *efadv_qp, + uint32_t hints); +#endif struct efa_unit_test_mocks { diff --git a/prov/efa/test/efa_unit_test_rma.c b/prov/efa/test/efa_unit_test_rma.c index 80778c4165d..83b826897e1 100644 --- a/prov/efa/test/efa_unit_test_rma.c +++ b/prov/efa/test/efa_unit_test_rma.c @@ -636,3 +636,87 @@ void test_efa_rma_write_0_byte_with_inject_flag(struct efa_resource **state) assert_int_equal(g_ibv_submitted_wr_id_cnt, 1); } + +#if HAVE_EFADV_WR_PROCESSING_HINTS +/** + * @brief Helper to test efa_ibv_post_write processing hint behavior. + * + * Always enables efa_env.enable_high_pps temporarily, and uses the + * flags parameter to control whether the hint should fire. + */ +static void test_efa_ibv_post_write_processing_hints_impl(struct efa_resource *resource, + uint64_t flags) +{ + struct efa_unit_test_buff local_buff; + struct ibv_sge sge; + struct efa_base_ep *base_ep; + struct efa_qp *qp; + struct ibv_qp_ex *ibv_qpx; + struct efadv_qp *efadv_qp; + struct efa_ah fake_ah = {0}; + bool enable_high_pps_orig; + + efa_unit_test_resource_construct(resource, FI_EP_RDM, EFA_DIRECT_FABRIC_NAME); + + base_ep = container_of(resource->ep, struct efa_base_ep, util_ep.ep_fid); + qp = base_ep->qp; + ibv_qpx = qp->ibv_qp_ex; + + /* Set up ibv_wr function pointer mocks */ + ibv_qpx->wr_start = &efa_mock_ibv_wr_start_no_op; + ibv_qpx->wr_rdma_write = &efa_mock_ibv_wr_rdma_write_save_wr; + ibv_qpx->wr_set_sge_list = &efa_mock_ibv_wr_set_sge_list_no_op; + ibv_qpx->wr_set_ud_addr = &efa_mock_ibv_wr_set_ud_addr_no_op; + ibv_qpx->wr_complete = &efa_mock_ibv_wr_complete_no_op; + + /* Set up efadv_qp processing hint mock */ + efadv_qp = efadv_qp_from_ibv_qp_ex(ibv_qpx); + efadv_qp->wr_set_processing_hints = efa_mock_efadv_wr_set_processing_hints; + + enable_high_pps_orig = efa_env.enable_high_pps; + efa_env.enable_high_pps = true; + + if (flags & FI_EFA_WR_HIGH_PPS) { + expect_function_call(efa_mock_efadv_wr_set_processing_hints); + expect_value(efa_mock_efadv_wr_set_processing_hints, hints, + EFADV_WR_PROCESSING_HINT_BURST_PPS_SENSITIVE); + } + + efa_unit_test_buff_construct(&local_buff, resource, 4096); + sge.addr = (uintptr_t)local_buff.buff; + sge.length = local_buff.size; + sge.lkey = ((struct efa_mr *)fi_mr_desc(local_buff.mr))->ibv_mr->lkey; + + efa_ibv_post_write(qp, &sge, 1, 123456, 0x87654321, 0, 0, + flags, &fake_ah, 0, 0); + + efa_env.enable_high_pps = enable_high_pps_orig; + efa_unit_test_buff_destruct(&local_buff); +} +#endif + +/** + * @brief Test that efa_ibv_post_write calls efadv_wr_set_processing_hints + * when FI_EFA_WR_HIGH_PPS flag is set. + */ +void test_efa_ibv_post_write_processing_hints_with_high_pps(struct efa_resource **state) +{ +#if HAVE_EFADV_WR_PROCESSING_HINTS + test_efa_ibv_post_write_processing_hints_impl(*state, FI_EFA_WR_HIGH_PPS); +#else + skip(); +#endif +} + +/** + * @brief Test that efa_ibv_post_write does NOT call efadv_wr_set_processing_hints + * when FI_DELIVERY_COMPLETE flag is set without FI_EFA_WR_HIGH_PPS. + */ +void test_efa_ibv_post_write_processing_hints_without_high_pps(struct efa_resource **state) +{ +#if HAVE_EFADV_WR_PROCESSING_HINTS + test_efa_ibv_post_write_processing_hints_impl(*state, FI_DELIVERY_COMPLETE); +#else + skip(); +#endif +} diff --git a/prov/efa/test/efa_unit_tests.c b/prov/efa/test/efa_unit_tests.c index 4b97e15e89b..bbe865d5f42 100644 --- a/prov/efa/test/efa_unit_tests.c +++ b/prov/efa/test/efa_unit_tests.c @@ -447,6 +447,8 @@ int main(void) cmocka_unit_test_setup_teardown(test_efa_rma_write, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rma_writev, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rma_writemsg, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_ibv_post_write_processing_hints_with_high_pps, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_ibv_post_write_processing_hints_without_high_pps, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rma_writedata, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rma_inject_write, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_rma_inject_writedata, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), @@ -501,6 +503,7 @@ int main(void) cmocka_unit_test_setup_teardown(test_efa_data_path_direct_dev_req_id_roundtrip, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_data_path_direct_stale_completion_detected, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_data_path_direct_qp_gen_increments_across_qps, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), + cmocka_unit_test_setup_teardown(test_efa_data_path_direct_write_high_pps_hint_set, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_cq_read_no_completion, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_cq_read_send_success, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), cmocka_unit_test_setup_teardown(test_efa_cq_read_senddata_success, efa_unit_test_mocks_setup, efa_unit_test_mocks_teardown), diff --git a/prov/efa/test/efa_unit_tests.h b/prov/efa/test/efa_unit_tests.h index 74b4755c916..02db358c94b 100644 --- a/prov/efa/test/efa_unit_tests.h +++ b/prov/efa/test/efa_unit_tests.h @@ -516,6 +516,7 @@ void test_efa_data_path_direct_qp_gen_initialization(); void test_efa_data_path_direct_dev_req_id_roundtrip(); void test_efa_data_path_direct_stale_completion_detected(); void test_efa_data_path_direct_qp_gen_increments_across_qps(); +void test_efa_data_path_direct_write_high_pps_hint_set(); /* end efa_unit_test_data_path_direct.c */ @@ -571,6 +572,8 @@ void test_efa_rdm_rma_post_remote_read_partial_fail_no_txe_release(); void test_efa_rdm_rma_partial_post_retry_no_double_free(); void test_efa_rdm_rma_partial_post_retry_no_double_free_read(); void test_efa_rdm_msg_send_multi_pkt_sendv_fail_no_inflight(); +void test_efa_ibv_post_write_processing_hints_with_high_pps(); +void test_efa_ibv_post_write_processing_hints_without_high_pps(); /* end efa_unit_test_rdm_rma.c */ static inline