diff --git a/CODEOWNERS b/CODEOWNERS index 6c46db6201a86..8038f96a09eaf 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -494,5 +494,6 @@ extensions/upstreams/tcp @ggreenway @mattklein123 /contrib/peak_ewma/load_balancing_policies/ @rroblak @UNOWNED /contrib/kae/ @Misakokoro @UNOWNED /contrib/istio @kyessenov @wbpcode @keithmattix @krinkinmu @zirain +/contrib/reverse_tunnel_reporter @agrawroh @aakugan /compat/openssl/ @tedjpoole @envoyproxy/envoy-openssl-sync diff --git a/api/BUILD b/api/BUILD index 2b36b337b1c34..4cd9d47ab049f 100644 --- a/api/BUILD +++ b/api/BUILD @@ -104,6 +104,8 @@ proto_library( "//contrib/envoy/extensions/private_key_providers/kae/v3alpha:pkg", "//contrib/envoy/extensions/private_key_providers/qat/v3alpha:pkg", "//contrib/envoy/extensions/regex_engines/hyperscan/v3alpha:pkg", + "//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client:pkg", + "//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters:pkg", "//contrib/envoy/extensions/router/cluster_specifier/golang/v3alpha:pkg", "//contrib/envoy/extensions/stat_sinks/kafka/v3:pkg", "//contrib/envoy/extensions/tap_sinks/udp_sink/v3alpha:pkg", diff --git a/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/BUILD b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/BUILD new file mode 100644 index 0000000000000..6409469bbd62f --- /dev/null +++ b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/BUILD @@ -0,0 +1,13 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + has_services = True, + deps = [ + "//envoy/config/core/v3:pkg", + "@xds//udpa/annotations:pkg", + ], +) diff --git a/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/grpc_client.proto b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/grpc_client.proto new file mode 100644 index 0000000000000..eaa122194bc2c --- /dev/null +++ b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/grpc_client.proto @@ -0,0 +1,50 @@ +syntax = "proto3"; + +package envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client; + +import "google/protobuf/duration.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client"; +option java_outer_classname = "GrpcClientProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// Configuration for gRPC push-based connection event client. +// Actively pushes connection events to a cluster using grpc using some internal timing. +// [#next-free-field: 7] +message GrpcClientConfig { + // Stat prefix for this client's metrics. + string stat_prefix = 1; + + // Name of the cluster to send gRPC requests to. + // It must be present in the config otherwise the setup will throw error in the onServerInitialized. + string cluster = 2 [(validate.rules).string = {min_len: 1}]; + + // Default interval between sending batched connection events. + // Default is 5s. + google.protobuf.Duration default_send_interval = 3 [(validate.rules).duration = { + lte {seconds: 3600} + gte {nanos: 25000000} + }]; + + // Interval between connection retry attempts to the gRPC service. + // Connect timeouts are provided at the cluster level and will be handled by the http/2 client. + // How much time to wait after a failed connect before retrying. Default is 5s. + google.protobuf.Duration connect_retry_interval = 4 [(validate.rules).duration = { + lte {seconds: 3600} + gte {nanos: 25000000} + }]; + + // Maximum number of retry attempts for failed gRPC sends. + // Basically the cluster will have default_send_interval * max_retries time to respond. + // Default is 5. After this we will disconnect and try to connect again. + uint32 max_retries = 5; + + // Maximum events to buffer at any given time + // Default is 1,000,000. + uint32 max_buffer = 6; +} diff --git a/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/stream_reverse_tunnels.proto b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/stream_reverse_tunnels.proto new file mode 100644 index 0000000000000..ebc381c0c6094 --- /dev/null +++ b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/stream_reverse_tunnels.proto @@ -0,0 +1,117 @@ +syntax = "proto3"; + +package envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client; + +import "envoy/config/core/v3/base.proto"; + +import "google/protobuf/duration.proto"; +import "google/protobuf/struct.proto"; +import "google/protobuf/timestamp.proto"; +import "google/rpc/status.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client"; +option java_outer_classname = "StreamReverseTunnelsProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +// [#protodoc-title: Reverse Tunnel Reporting Service] + +// ReverseTunnelReportingService allows Envoy instances to report reverse tunnel +// connection state changes to a management server for monitoring and coordination. +service ReverseTunnelReportingService { + // Bidirectional stream for reporting reverse tunnel connection state changes. + // The management server can control reporting intervals and acknowledge received reports. + rpc StreamReverseTunnels(stream StreamReverseTunnelsRequest) + returns (stream StreamReverseTunnelsResponse) { + } +} + +// Request message sent by Envoy to report reverse tunnel state changes. +// [#next-free-field: 7] +message StreamReverseTunnelsRequest { + // Node identifier for the reporting Envoy instance. + // This identifies which Envoy instance is sending the report. + config.core.v3.Node node = 1 [(validate.rules).message = {required: true}]; + + // List of reverse tunnels that were established since the last report. + // Each tunnel represents a new connection from a downstream Envoy. + repeated ReverseTunnel added_tunnels = 2; + + // List of tunnel names that were disconnected since the last report. + // Only the tunnel name is needed for removal notifications. + repeated string removed_tunnel_names = 3 + [(validate.rules).repeated = {items {string {min_len: 1}}}]; + + // Optional metadata for additional context or debugging information. + // Can include deployment information, version details, etc. + google.protobuf.Struct metadata = 4; + + // Indicates whether this report contains all active tunnels (true) or + // only changes since the last report (false). Usually invoked only on server disconnects. + bool full_push = 5; + + // Unique nonce for this request to enable proper ACK/NACK handling. + // Must be non-negative and should increment for each request. + // This can also be modified to be used for checksum and tracking in the future.a + int64 nonce = 6 [(validate.rules).int64 = {gte: 0}]; +} + +// Response message sent by the management server to control reporting behavior. +message StreamReverseTunnelsResponse { + // Node identifier acknowledging which Envoy instance this response is for. + // Should match the node from the corresponding request. + config.core.v3.Node node = 1; + + // Interval at which Envoy should send tunnel state reports. + // This is used to change the reporting_interval -> no need to repeat the same value. + google.protobuf.Duration report_interval = 2 [(validate.rules).duration = {lte {seconds: 3600}}]; + + // Nonce from the request being acknowledged or rejected. + // Must match the nonce from the corresponding request. + int64 request_nonce = 3 [(validate.rules).int64 = {gte: 0}]; + + // Error details if the previous request failed processing. + // If populated, indicates the request was rejected (NACK). + // If empty, indicates successful processing (ACK). + // NACK will terminate the connection -> useful for logging rather than just some disconnect. + // So basically -> NACK then terminate. + google.rpc.Status error_detail = 4; +} + +// Represents a single reverse tunnel connection with its metadata. +message ReverseTunnel { + // Unique name to identify this tunnel connection. + // Typically formatted as "{node_id}|{cluster_id}" or similar. + // Must be unique within the reporting Envoy instance. + // This is also used for the reporting the disconnection with the associated tunnel initiator. + string name = 1 [(validate.rules).string = {min_len: 1}]; + + // Detailed information about the tunnel connection. + ReverseTunnelInfo tunnel_info = 2 [(validate.rules).message = {required: true}]; +} + +// Detailed information about a reverse tunnel connection. +message ReverseTunnelInfo { + // Identity information of the tunnel initiator (downstream Envoy). + // Contains node_id, cluster_id, and tenant_id for proper identification. + TunnelInitiatorIdentity identity = 1 [(validate.rules).message = {required: true}]; + + // Timestamp when this tunnel connection was created. + // Used for ordering events and debugging connection timing issues. + google.protobuf.Timestamp created_at = 2 [(validate.rules).timestamp = {required: true}]; +} + +message TunnelInitiatorIdentity { + // Required: Tenant identifier of the initiating Envoy instance. + string tenant_id = 1 [(validate.rules).string = {min_len: 1 max_len: 128}]; + + // Required: Cluster identifier of the initiating Envoy instance. + string cluster_id = 2 [(validate.rules).string = {min_len: 1 max_len: 128}]; + + // Required: Node identifier of the initiating Envoy instance. + string node_id = 3 [(validate.rules).string = {min_len: 1 max_len: 128}]; +} diff --git a/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/BUILD b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/BUILD new file mode 100644 index 0000000000000..5f552f08145ca --- /dev/null +++ b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/BUILD @@ -0,0 +1,9 @@ +# DO NOT EDIT. This file is generated by tools/proto_format/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = ["@xds//udpa/annotations:pkg"], +) diff --git a/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.proto b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.proto new file mode 100644 index 0000000000000..2b1315d801544 --- /dev/null +++ b/api/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; + +package envoy.extensions.reverse_tunnel_reporters.v3alpha.reporters; + +import "google/protobuf/any.proto"; + +import "udpa/annotations/status.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.reverse_tunnel_reporters.v3alpha.reporters"; +option java_outer_classname = "EventReporterProto"; +option java_multiple_files = true; +option go_package = "github.com/envoyproxy/go-control-plane/contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters"; +option (udpa.annotations.file_status).package_version_status = ACTIVE; + +message ReverseConnectionReporterClient { + // Name to use to pick out the client should match the one reported by the factory. + string name = 1 [(validate.rules).string = {min_len: 1}]; + + // Typed config for the client + google.protobuf.Any typed_config = 2 [(validate.rules).any = {required: true}]; +} + +// Configuration for the connection event reporter. +message EventReporterConfig { + // Stat prefix for this reporter's metrics. + // Metrics will be emitted as "{stat_prefix}.events_pushed", etc. + string stat_prefix = 1; + + // List of clients to report to. + repeated ReverseConnectionReporterClient clients = 2 [(validate.rules).repeated = {min_items: 1}]; +} diff --git a/api/versioning/BUILD b/api/versioning/BUILD index d9358c863733e..3cd46fa04a6c4 100644 --- a/api/versioning/BUILD +++ b/api/versioning/BUILD @@ -43,6 +43,8 @@ proto_library( "//contrib/envoy/extensions/private_key_providers/kae/v3alpha:pkg", "//contrib/envoy/extensions/private_key_providers/qat/v3alpha:pkg", "//contrib/envoy/extensions/regex_engines/hyperscan/v3alpha:pkg", + "//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client:pkg", + "//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters:pkg", "//contrib/envoy/extensions/router/cluster_specifier/golang/v3alpha:pkg", "//contrib/envoy/extensions/stat_sinks/kafka/v3:pkg", "//contrib/envoy/extensions/tap_sinks/udp_sink/v3alpha:pkg", diff --git a/contrib/contrib_build_config.bzl b/contrib/contrib_build_config.bzl index 8c7bad41f4d92..9baa8d1541cff 100644 --- a/contrib/contrib_build_config.bzl +++ b/contrib/contrib_build_config.bzl @@ -118,4 +118,10 @@ CONTRIB_EXTENSIONS = { # "envoy.upstreams.http.tcp.golang": "//contrib/golang/upstreams/http/tcp/source:config", + + # + # Reverse tunnel reporters + # + + "envoy.bootstrap.reverse_tunnel.reverse_tunnel_reporting_service": "//contrib/reverse_tunnel_reporter/source:config", } diff --git a/contrib/extensions_metadata.yaml b/contrib/extensions_metadata.yaml index eca1671c20e35..e2f133745f368 100644 --- a/contrib/extensions_metadata.yaml +++ b/contrib/extensions_metadata.yaml @@ -205,3 +205,11 @@ envoy.load_balancing_policies.peak_ewma: status: alpha type_urls: - envoy.extensions.load_balancing_policies.peak_ewma.v3alpha.PeakEwma +envoy.bootstrap.reverse_tunnel.reverse_tunnel_reporting_service: + categories: + - envoy.bootstrap + security_posture: requires_trusted_downstream_and_upstream + status: alpha + type_urls: + - envoy.extensions.reverse_tunnel_reporters.v3alpha.reporters.EventReporterConfig + - envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client.GrpcClientConfig diff --git a/contrib/reverse_tunnel_reporter/source/BUILD b/contrib/reverse_tunnel_reporter/source/BUILD new file mode 100644 index 0000000000000..6890914b2f78a --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/BUILD @@ -0,0 +1,36 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_contrib_extension", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_library( + name = "reverse_tunnel_event_types", + hdrs = [ + "reverse_tunnel_event_types.h", + ], + deps = [ + "//envoy/common:pure_lib", + "//envoy/common:time_interface", + "//envoy/config:typed_config_interface", + "//envoy/extensions/bootstrap/reverse_tunnel:reverse_tunnel_reporter_lib", + "//envoy/server:factory_context_interface", + "//source/common/common:fmt_lib", + "//source/common/config:utility_lib", + "//source/common/protobuf", + "//source/common/protobuf:message_validator_lib", + ], +) + +envoy_cc_contrib_extension( + name = "config", + deps = [ + "//contrib/reverse_tunnel_reporter/source/clients:clients_lib", + "//contrib/reverse_tunnel_reporter/source/reporters:reporters_lib", + ], +) diff --git a/contrib/reverse_tunnel_reporter/source/clients/BUILD b/contrib/reverse_tunnel_reporter/source/clients/BUILD new file mode 100644 index 0000000000000..e8886c7f0e7e5 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/clients/BUILD @@ -0,0 +1,16 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_library( + name = "clients_lib", + deps = [ + "//contrib/reverse_tunnel_reporter/source/clients/grpc_client:grpc_client_lib", + ], +) diff --git a/contrib/reverse_tunnel_reporter/source/clients/grpc_client/BUILD b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/BUILD new file mode 100644 index 0000000000000..17adeffe19274 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/BUILD @@ -0,0 +1,29 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_library( + name = "grpc_client_lib", + srcs = [ + "client.cc", + "factory.cc", + ], + hdrs = [ + "client.h", + "factory.h", + ], + deps = [ + "//contrib/reverse_tunnel_reporter/source:reverse_tunnel_event_types", + "//envoy/registry", + "//source/common/common:logger_lib", + "//source/common/grpc:typed_async_client_lib", + "//source/common/protobuf:utility_lib", + "@envoy_api//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client:pkg_cc_proto", + ], +) diff --git a/contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.cc b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.cc new file mode 100644 index 0000000000000..f7ad7bf77a7ac --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.cc @@ -0,0 +1,284 @@ +#include "contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +GrpcClientConfig::GrpcClientConfig(const GrpcConfigProto& proto_config) + : stat_prefix(PROTOBUF_GET_STRING_OR_DEFAULT(proto_config, stat_prefix, + "reverse_tunnel_reporter_client.grpc_client")), + cluster(proto_config.cluster()), + send_interval(PROTOBUF_GET_MS_OR_DEFAULT(proto_config, default_send_interval, 5000)), + connect_retry_interval( + PROTOBUF_GET_MS_OR_DEFAULT(proto_config, connect_retry_interval, 5000)), + max_retries(proto_config.max_retries() ? proto_config.max_retries() : 5), + max_buffer(proto_config.max_buffer() ? proto_config.max_buffer() : 1000000) {} + +GrpcClient::GrpcClient(Server::Configuration::ServerFactoryContext& context, + const GrpcConfigProto& config) + : context_{context}, config_{config}, + service_method_{*Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.extensions.reverse_tunnel_reporters.v3alpha.clients.grpc_client" + ".ReverseTunnelReportingService.StreamReverseTunnels")}, + stats_(context_, config_.stat_prefix, config_.cluster) { + ENVOY_LOG(info, + "GrpcClient: constructed: cluster={}, send_interval={}ms, retry_interval={}ms, " + "max_retries={}, max_buffer={}", + config_.cluster, config_.send_interval.count(), config_.connect_retry_interval.count(), + config_.max_retries, config_.max_buffer); + + send_timer_ = context.mainThreadDispatcher().createTimer([this]() { send(false); }); + + retry_timer_ = context.mainThreadDispatcher().createTimer([this]() { connect(); }); + + stats_.send_interval_gauge_.set(config_.send_interval.count()); +} + +void GrpcClient::onServerInitialized(ReverseTunnelReporterWithState* reporter) { + reporter_ = reporter; + + envoy::config::core::v3::GrpcService grpc_service; + grpc_service.mutable_envoy_grpc()->set_cluster_name(config_.cluster); + + auto thread_local_cluster = context_.clusterManager().getThreadLocalCluster(config_.cluster); + if (!thread_local_cluster) { + ENVOY_LOG(error, "GrpcClient: cluster '{}' not found, cannot initialize", config_.cluster); + return; + } + + auto result = context_.clusterManager().grpcAsyncClientManager().getOrCreateRawAsyncClient( + grpc_service, thread_local_cluster->info()->statsScope(), false); + if (!result.ok()) { + ENVOY_LOG(error, "GrpcClient: failed to create gRPC async client: {}", + result.status().message()); + return; + } + + async_client_ = Grpc::AsyncClient(result.value()); + ENVOY_LOG(info, "GrpcClient: initialized: cluster={}", config_.cluster); + + initialized_ = true; + connect(); +} + +void GrpcClient::receiveEvents(ReverseTunnelEvent::BatchedEvents events) { + // Either we errored out of the initialized -> prevent infinite growth. + // Or the onServerInitialized has not been called yet -> no worries we will do a full push on + // connect. + if (!initialized_) { + ENVOY_LOG(debug, "GrpcClient: not initialized, cannot receive events"); + return; + } + + if ((events.size() + queued_events_.size()) > config_.max_buffer) { + ENVOY_LOG(error, + "GrpcClient: buffer overflow: cluster={}, queued={}, incoming={}, max_buffer={}", + config_.cluster, queued_events_.size(), events.size(), config_.max_buffer); + + stats_.events_dropped_counter_.add(events.size()); + + // Only disconnect if the stream is alive. If already disconnected, calling disconnect() + // would re-arm the retry timer, delaying the reconnect that is already scheduled. + if (stream_ != nullptr) { + stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::ResourceExhausted, + GrpcDisconnectionReason::DisconnectReason::BUFFER_OVERFLOW)) + .inc(); + disconnect(); + } + return; + } + + const auto incoming_conns = events.connections.size(); + const auto incoming_disconns = events.disconnections.size(); + stats_.queued_events_counter_.add(events.size()); + queued_events_ += std::move(events); + ENVOY_LOG(debug, "GrpcClient: enqueued: cluster={}, +={}, -={}, queued_now={}", config_.cluster, + incoming_conns, incoming_disconns, queued_events_.size()); +} + +void GrpcClient::onReceiveMessage(Grpc::ResponsePtr&& message) { + const auto resp_nonce = message->request_nonce(); + + if (message->has_error_detail()) { + ENVOY_LOG(error, "GrpcClient: NACK: cluster={}, nonce={}", config_.cluster, resp_nonce); + + stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::Aborted, + GrpcDisconnectionReason::DisconnectReason::NACK_RECEIVED)) + .inc(); + return disconnect(); + } + + // A server cannot ACK a nonce we never sent. If this fires the server has a bug. + ASSERT( + resp_nonce <= nonce_current_, + fmt::format("server acked nonce {} but we only sent up to {}", resp_nonce, nonce_current_)); + + // Valid ACK: must be newer than the last acked watermark and within what we've sent. + if (resp_nonce > nonce_acked_ && resp_nonce <= nonce_current_) { + stats_.acks_received_counter_.inc(); + nonce_acked_ = resp_nonce; + stats_.nonce_acked_gauge_.set(nonce_acked_); + + // The server may dynamically adjust our send cadence via report_interval in each ACK. + // We floor it at kMinSendInterval to prevent tight send loops. + auto new_interval = std::chrono::milliseconds( + PROTOBUF_GET_MS_OR_DEFAULT(*message, report_interval, config_.send_interval.count())); + config_.send_interval = std::max(new_interval, kMinSendInterval); + stats_.send_interval_gauge_.set(config_.send_interval.count()); + + ENVOY_LOG(debug, "GrpcClient: ACK: cluster={}, nonce={}", config_.cluster, resp_nonce); + } else { + ENVOY_LOG(debug, "GrpcClient: out-of-order ACK: cluster={}, nonce={}, expected_range=[{}, {}]", + config_.cluster, resp_nonce, nonce_acked_ + 1, nonce_current_); + stats_.out_of_order_acks_counter_.inc(); + } +} + +void GrpcClient::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) { + // Even a graceful close is unexpected — the server should keep the stream open indefinitely. + if (status == Grpc::Status::WellKnownGrpcStatus::Ok) { + ENVOY_LOG(error, "GrpcClient: remote close (ok): cluster={}, message={}", config_.cluster, + message); + } else { + ENVOY_LOG(error, "GrpcClient: remote close: cluster={}, status={}, message={}", config_.cluster, + status, message); + } + + stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(status, GrpcDisconnectionReason::DisconnectReason::REMOTE_CLOSE)) + .inc(); + disconnect(); +} + +void GrpcClient::connect() { + ENVOY_LOG(info, "GrpcClient: connecting: cluster={}", config_.cluster); + + stream_ = async_client_.start(service_method_, *this, Http::AsyncClient::StreamOptions()); + if (stream_ == nullptr) { + ENVOY_LOG(error, "GrpcClient: stream creation failed: cluster={}", config_.cluster); + + stats_ + .getCounter( + stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::Internal, + GrpcDisconnectionReason::DisconnectReason::STREAM_CREATION_FAILED)) + .inc(); + return disconnect(); + } + + // New stream, new nonce epoch. Stale nonces from the previous stream are meaningless. + nonce_acked_ = nonce_current_ = 0; + stats_.nonce_current_gauge_.set(0); + stats_.nonce_acked_gauge_.set(0); + + stats_.connection_attempts_counter_.inc(); + ENVOY_LOG(info, "GrpcClient: connected: cluster={}", config_.cluster); + send(true); +} + +void GrpcClient::disconnect() { + if (stream_ != nullptr) { + stream_.resetStream(); + stream_ = nullptr; + } + + // Stop the send loop — no point sending on a dead stream. The retry timer will reconnect. + send_timer_->disableTimer(); + setTimer(retry_timer_, config_.connect_retry_interval); + ENVOY_LOG(debug, "GrpcClient: disconnect, scheduled reconnect: cluster={}, retry_in_ms={}", + config_.cluster, config_.connect_retry_interval.count()); +} + +void GrpcClient::send(bool full_push) { + ASSERT(stream_ != nullptr); + // Too many in-flight unacked messages — the server is likely dead or stuck. Disconnect + // and let the retry timer establish a fresh stream. + if ((nonce_current_ - nonce_acked_) > config_.max_retries) { + ENVOY_LOG(error, "GrpcClient: too many unacked requests: cluster={}, nonce={}", config_.cluster, + nonce_current_); + + stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded, + GrpcDisconnectionReason::DisconnectReason::MAX_RETRIES_EXCEEDED)) + .inc(); + return disconnect(); + } + + ENVOY_LOG(debug, "GrpcClient: sending: cluster={}, full_push={}, queued_now={}", config_.cluster, + full_push, queued_events_.size()); + stats_.send_attempts_counter_.inc(); + stream_.sendMessage(constructMessage(full_push), false); + + setTimer(send_timer_, config_.send_interval); +} + +StreamTunnelsReq GrpcClient::constructMessage(bool full_push) { + // Full push replaces the pending diff queue with a complete snapshot from the reporter. + // Any queued diffs are stale at this point because the full snapshot supersedes them. + if (full_push) { + stats_.events_dropped_counter_.add(queued_events_.size()); + queued_events_ = ReverseTunnelEvent::BatchedEvents{{reporter_->getAllConnections()}, {}}; + stats_.queued_events_counter_.add(queued_events_.size()); + ENVOY_LOG(info, "GrpcClient: full_push queued: cluster={}, queued_now={}", config_.cluster, + queued_events_.size()); + } + + StreamTunnelsReq message; + + auto* node = message.mutable_node(); + node->set_id(context_.localInfo().nodeName()); + node->set_cluster(context_.localInfo().clusterName()); + + auto* added_tunnels = message.mutable_added_tunnels(); + for (auto& conn : queued_events_.connections) { + auto* new_tunnel = added_tunnels->Add(); + new_tunnel->set_name(ReverseTunnelEvent::getName(conn->node_id, conn->cluster_id)); + + auto* tunnel_info = new_tunnel->mutable_tunnel_info(); + TimestampUtil::systemClockToTimestamp(conn->created_at, *tunnel_info->mutable_created_at()); + + auto* tunnel_id = tunnel_info->mutable_identity(); + tunnel_id->set_tenant_id(conn->tenant_id); + tunnel_id->set_cluster_id(conn->cluster_id); + tunnel_id->set_node_id(conn->node_id); + } + + auto* removed_tunnels = message.mutable_removed_tunnel_names(); + for (auto& disconn : queued_events_.disconnections) { + *removed_tunnels->Add() = disconn->name; + } + + message.set_full_push(full_push); + message.set_nonce(++nonce_current_); + stats_.nonce_current_gauge_.set(nonce_current_); + + ENVOY_LOG(debug, + "GrpcClient: built request: cluster={}, full_push={}, add={}, remove={}, nonce={}", + config_.cluster, full_push, queued_events_.connections.size(), + queued_events_.disconnections.size(), message.nonce()); + + stats_.sent_accepted_cnt_counter_.add(queued_events_.connections.size()); + stats_.sent_removed_cnt_counter_.add(queued_events_.disconnections.size()); + queued_events_.clear(); + + return message; +} + +void GrpcClient::setTimer(Event::TimerPtr& timer, const std::chrono::milliseconds& ms) { + if (timer->enabled()) + timer->disableTimer(); + + timer->enableTimer(ms); +} + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.h b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.h new file mode 100644 index 0000000000000..5dccd9c0a1785 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.h @@ -0,0 +1,233 @@ +#pragma once + +#include +#include +#include + +#include "envoy/event/timer.h" +#include "envoy/grpc/async_client.h" + +#include "source/common/common/logger.h" +#include "source/common/grpc/typed_async_client.h" +#include "source/common/protobuf/utility.h" +#include "source/common/stats/symbol_table.h" + +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/grpc_client.pb.h" +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/stream_reverse_tunnels.pb.h" +#include "contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +namespace GrpcDisconnectionReason { + +#define ITEMS(X) \ + X(BUFFER_OVERFLOW, buffer_overflow) \ + X(MAX_RETRIES_EXCEEDED, max_retries_exceeded) \ + X(NACK_RECEIVED, nack_received) \ + X(REMOTE_CLOSE, remote_close) \ + X(STREAM_CREATION_FAILED, stream_creation_failed) + +#define ENUM_DECLARE(name, str) name, +enum class DisconnectReason { ITEMS(ENUM_DECLARE) COUNT }; +#undef ENUM_DECLARE + +#define ENUM_STRING(name, str) #str, +constexpr std::array(DisconnectReason::COUNT)> + DisconnectReasonStrings = {ITEMS(ENUM_STRING)}; +#undef ENUM_STRING + +constexpr absl::string_view toString(DisconnectReason r) { + return DisconnectReasonStrings[static_cast(r)]; +} + +} // namespace GrpcDisconnectionReason + +using GrpcConfigProto = + envoy::extensions::reverse_tunnel_reporters::v3alpha::clients::grpc_client::GrpcClientConfig; +using StreamTunnelsReq = envoy::extensions::reverse_tunnel_reporters::v3alpha::clients:: + grpc_client::StreamReverseTunnelsRequest; +using StreamTunnelsResp = envoy::extensions::reverse_tunnel_reporters::v3alpha::clients:: + grpc_client::StreamReverseTunnelsResponse; + +// Floor for the server-adjustable send interval to prevent tight loops. +static constexpr std::chrono::milliseconds kMinSendInterval{25}; + +/// Parsed and validated configuration from the GrpcClientConfig proto, with defaults applied. +struct GrpcClientConfig { + std::string stat_prefix; + std::string cluster; + std::chrono::milliseconds send_interval; + std::chrono::milliseconds connect_retry_interval; + uint32_t max_retries; + uint32_t max_buffer; + + explicit GrpcClientConfig(const GrpcConfigProto& config); +}; + +/// Bidirectional gRPC streaming client that reports reverse-tunnel connection +/// state to a remote ReverseTunnelReportingService. +/// +/// Protocol: +/// - On connect (and every reconnect) the client does a full push of all +/// known connections obtained from the reporter. +/// - Between connects the client sends incremental diffs (new connections / +/// removals) on a periodic send timer. +/// - Each request carries an incrementing nonce. The server ACKs by echoing +/// the nonce; a NACK carries an error_detail. If too many nonces remain +/// unacked the client disconnects and reconnects. +/// - The server may adjust the send interval via report_interval in its ACK; +/// the client clamps it to kMinSendInterval to prevent tight loops. +/// +/// Lifecycle: constructed by GrpcClientFactory, initialized via +/// onServerInitialized() which creates the gRPC channel and opens the first +/// stream and starts the send cycle. Empty messages are also sent (considered as heartbeat). +class GrpcClient : public ReverseTunnelReporterClient, + public Logger::Loggable, + public Grpc::AsyncStreamCallbacks { +public: + GrpcClient(Server::Configuration::ServerFactoryContext& context, const GrpcConfigProto& config); + + // ReverseTunnelReporterClient overrides + void onServerInitialized(ReverseTunnelReporterWithState* reporter) override; + + void receiveEvents(ReverseTunnelEvent::BatchedEvents events) override; + + // RawAsyncStreamCallbacks overrides + void onCreateInitialMetadata(Http::RequestHeaderMap&) override {} + + void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {} + + void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {} + + void onReceiveMessage(Grpc::ResponsePtr&& message) override; + + void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override; + +private: + // Actions + void connect(); + + void disconnect(); + + void send(bool full_push); + + // Helpers + StreamTunnelsReq constructMessage(bool full_push); + + void setTimer(Event::TimerPtr& timer, const std::chrono::milliseconds& ms); + + // config + Server::Configuration::ServerFactoryContext& context_; + GrpcClientConfig config_; + ReverseTunnelReporterWithState* reporter_{nullptr}; + + // State management + ReverseTunnelEvent::BatchedEvents queued_events_; + int64_t nonce_current_{0}; // Monotonically increasing, bumped on every sendMessage. + int64_t nonce_acked_{0}; // High watermark of server-acknowledged nonces. + // Guards against processing events when onServerInitialized() failed (cluster not + // found, client creation error). Without this the client silently queues to nowhere. + bool initialized_{false}; + + // grpc client and stream requirements + Grpc::AsyncClient async_client_; + Grpc::AsyncStream stream_{}; + const Protobuf::MethodDescriptor& service_method_; + + // timers + Event::TimerPtr retry_timer_; + Event::TimerPtr send_timer_; + + struct GrpcClientStats { + GrpcClientStats(Server::Configuration::ServerFactoryContext& context, + const std::string& stat_prefix, const std::string& cluster_name) + : context_{context}, stat_name_pool_(context.scope().symbolTable()), + stat_prefix_(stat_name_pool_.add(stat_prefix)), + + disconnects_(stat_name_pool_.add("disconnects")), + + status_code_(stat_name_pool_.add("status_code")), + disconnect_reason_(stat_name_pool_.add("disconnect_reason")), + cluster_label_(stat_name_pool_.add("cluster")), + cluster_value_(stat_name_pool_.add(cluster_name)), + + connection_attempts_counter_( + getCounter(stat_name_pool_.add("connection_attempts"), getTags())), + acks_received_counter_(getCounter(stat_name_pool_.add("acks_received"), getTags())), + send_attempts_counter_(getCounter(stat_name_pool_.add("send_attempts"), getTags())), + sent_accepted_cnt_counter_( + getCounter(stat_name_pool_.add("sent_accepted_cnt"), getTags())), + sent_removed_cnt_counter_(getCounter(stat_name_pool_.add("sent_removed_cnt"), getTags())), + events_dropped_counter_(getCounter(stat_name_pool_.add("events_dropped"), getTags())), + queued_events_counter_(getCounter(stat_name_pool_.add("queued_events"), getTags())), + out_of_order_acks_counter_( + getCounter(stat_name_pool_.add("out_of_order_acks"), getTags())), + + send_interval_gauge_(getGauge(stat_name_pool_.add("send_interval"), getTags())), + nonce_current_gauge_(getGauge(stat_name_pool_.add("nonce_current"), getTags())), + nonce_acked_gauge_(getGauge(stat_name_pool_.add("nonce_acked"), getTags())) {} + + Stats::StatNameTagVector getTags() { + return Stats::StatNameTagVector{ + {cluster_label_, cluster_value_}, + }; + } + + Stats::StatNameTagVector getTags(Grpc::Status::GrpcStatus status, + GrpcDisconnectionReason::DisconnectReason reason) { + Stats::StatName status_value = stat_name_pool_.add(std::to_string(status)); + Stats::StatName reason_value = stat_name_pool_.add(GrpcDisconnectionReason::toString(reason)); + + return Stats::StatNameTagVector{ + {cluster_label_, cluster_value_}, + {status_code_, status_value}, + {disconnect_reason_, reason_value}, + }; + } + + Stats::Counter& getCounter(const Stats::StatName& name, Stats::StatNameTagVector&& tags) { + return Stats::Utility::counterFromStatNames(context_.scope(), {stat_prefix_, name}, tags); + } + + Stats::Gauge& getGauge(const Stats::StatName& name, Stats::StatNameTagVector&& tags) { + return Stats::Utility::gaugeFromStatNames(context_.scope(), {stat_prefix_, name}, + Stats::Gauge::ImportMode::NeverImport, tags); + } + + Server::Configuration::ServerFactoryContext& context_; + Stats::StatNamePool stat_name_pool_; + const Stats::StatName stat_prefix_; + + const Stats::StatName disconnects_; + + const Stats::StatName status_code_; + const Stats::StatName disconnect_reason_; + const Stats::StatName cluster_label_; + const Stats::StatName cluster_value_; + + Stats::Counter& connection_attempts_counter_; + Stats::Counter& acks_received_counter_; + Stats::Counter& send_attempts_counter_; + Stats::Counter& sent_accepted_cnt_counter_; + Stats::Counter& sent_removed_cnt_counter_; + Stats::Counter& events_dropped_counter_; + Stats::Counter& queued_events_counter_; + Stats::Counter& out_of_order_acks_counter_; + + Stats::Gauge& send_interval_gauge_; + Stats::Gauge& nonce_current_gauge_; + Stats::Gauge& nonce_acked_gauge_; + }; + + GrpcClientStats stats_; + + friend class GrpcClientTest; +}; + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/clients/grpc_client/factory.cc b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/factory.cc new file mode 100644 index 0000000000000..440ac5ac68da9 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/factory.cc @@ -0,0 +1,36 @@ +#include "contrib/reverse_tunnel_reporter/source/clients/grpc_client/factory.h" + +#include "envoy/registry/registry.h" + +#include "source/common/protobuf/utility.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +ReverseTunnelReporterClientPtr +GrpcClientFactory::createClient(Server::Configuration::ServerFactoryContext& context, + const Protobuf::Message& config) { + const auto& grpc_client_config = + MessageUtil::downcastAndValidate( + config, context.messageValidationVisitor()); + return std::make_unique(context, grpc_client_config); +} + +std::string GrpcClientFactory::name() const { + return "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.clients.grpc_client"; +} + +ProtobufTypes::MessagePtr GrpcClientFactory::createEmptyConfigProto() { + return std::make_unique(); +} + +REGISTER_FACTORY(GrpcClientFactory, ReverseTunnelReporterClientFactory); + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/clients/grpc_client/factory.h b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/factory.h new file mode 100644 index 0000000000000..765fc5a2d95a7 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/clients/grpc_client/factory.h @@ -0,0 +1,27 @@ +#pragma once + +#include "source/common/protobuf/utility.h" + +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/grpc_client.pb.h" +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/grpc_client.pb.validate.h" +#include "contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +class GrpcClientFactory : public ReverseTunnelReporterClientFactory { +public: + ReverseTunnelReporterClientPtr createClient(Server::Configuration::ServerFactoryContext& context, + const Protobuf::Message& config) override; + + std::string name() const override; + + ProtobufTypes::MessagePtr createEmptyConfigProto() override; +}; + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/reporters/BUILD b/contrib/reverse_tunnel_reporter/source/reporters/BUILD new file mode 100644 index 0000000000000..399bb153885c2 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/BUILD @@ -0,0 +1,16 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_library( + name = "reporters_lib", + deps = [ + "//contrib/reverse_tunnel_reporter/source/reporters/event_reporter:event_reporter_lib", + ], +) diff --git a/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/BUILD b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/BUILD new file mode 100644 index 0000000000000..ace33870375d9 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/BUILD @@ -0,0 +1,30 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_library", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_library( + name = "event_reporter_lib", + srcs = [ + "factory.cc", + "reporter.cc", + ], + hdrs = [ + "factory.h", + "reporter.h", + ], + deps = [ + "//contrib/reverse_tunnel_reporter/source:reverse_tunnel_event_types", + "//envoy/extensions/bootstrap/reverse_tunnel:reverse_tunnel_reporter_lib", + "//envoy/registry", + "//source/common/common:logger_lib", + "//source/common/config:utility_lib", + "//source/common/protobuf:utility_lib", + "@envoy_api//contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters:pkg_cc_proto", + ], +) diff --git a/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.cc b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.cc new file mode 100644 index 0000000000000..d4f86fe296d64 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.cc @@ -0,0 +1,58 @@ +#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.h" + +#include "envoy/registry/registry.h" + +#include "source/common/config/utility.h" +#include "source/common/protobuf/utility.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +ReverseTunnelReporterPtr +EventReporterFactory::createReporter(Server::Configuration::ServerFactoryContext& context, + ProtobufTypes::MessagePtr config) { + const auto& reporter_config = MessageUtil::downcastAndValidate( + *config, context.messageValidationVisitor()); + + std::vector clients; + clients.reserve(reporter_config.clients().size()); + for (const auto& client_config : reporter_config.clients()) { + clients.push_back(createClient(context, client_config)); + } + return std::make_unique(context, reporter_config, std::move(clients)); +} + +std::string EventReporterFactory::name() const { + return "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.reporters.event_" + "reporter"; +} + +ProtobufTypes::MessagePtr EventReporterFactory::createEmptyConfigProto() { + return std::make_unique(); +} + +ReverseTunnelReporterClientPtr +EventReporterFactory::createClient(Server::Configuration::ServerFactoryContext& context, + const ClientConfigProto& client_config) { + auto* factory = + Config::Utility::getFactoryByName(client_config.name()); + if (!factory) { + throw EnvoyException( + fmt::format("Unknown Reporter Client Factory: '{}'. " + "Make sure it is registered as a ReverseTunnelReporterClientFactory.", + client_config.name())); + } + + auto typed_config = Config::Utility::translateAnyToFactoryConfig( + client_config.typed_config(), context.messageValidationVisitor(), *factory); + return factory->createClient(context, *typed_config); +} + +REGISTER_FACTORY(EventReporterFactory, ReverseTunnelReporterFactory); + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.h b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.h new file mode 100644 index 0000000000000..14f272506c360 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.h @@ -0,0 +1,37 @@ +#pragma once + +#include "envoy/extensions/bootstrap/reverse_tunnel/reverse_tunnel_reporter.h" + +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.pb.h" +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.pb.validate.h" +#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h" +#include "contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +/// Factory that builds an EventReporter from its proto config, dynamically +/// resolving each child ReverseTunnelReporterClient by name. +class EventReporterFactory : public ReverseTunnelReporterFactory { +public: + ReverseTunnelReporterPtr createReporter(Server::Configuration::ServerFactoryContext& context, + ProtobufTypes::MessagePtr config) override; + std::string name() const override; + ProtobufTypes::MessagePtr createEmptyConfigProto() override; + +private: + using ConfigProto = + envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig; + using ClientConfigProto = envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters:: + ReverseConnectionReporterClient; + + ReverseTunnelReporterClientPtr createClient(Server::Configuration::ServerFactoryContext& context, + const ClientConfigProto& client_config); +}; + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.cc b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.cc new file mode 100644 index 0000000000000..14c0a0823e761 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.cc @@ -0,0 +1,125 @@ +#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h" + +#include "source/common/protobuf/utility.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +EventReporter::EventReporter(Server::Configuration::ServerFactoryContext& context, + const ConfigProto& config, + std::vector&& clients) + : context_{context}, clients_{std::move(clients)}, + stats_(generateStats( + PROTOBUF_GET_STRING_OR_DEFAULT(config, stat_prefix, "reverse_tunnel_reporter"), + context.scope())) { + ENVOY_LOG(info, "EventReporter: Constructed with {} clients", clients_.size()); +} + +void EventReporter::onServerInitialized() { + ENVOY_LOG(info, "EventReporter: Initialized"); + for (auto& client : clients_) { + client->onServerInitialized(this); + } +} + +void EventReporter::reportConnectionEvent(absl::string_view node_id, absl::string_view cluster_id, + absl::string_view tenant_id) { + auto ptr = std::make_shared( + ReverseTunnelEvent::Connected{std::string(node_id), std::string(cluster_id), + std::string(tenant_id), Envoy::SystemTime::clock::now()}); + + context_.mainThreadDispatcher().post( + [this, ptr = std::move(ptr)]() mutable { this->addConnection(std::move(ptr)); }); +} + +void EventReporter::reportDisconnectionEvent(absl::string_view node_id, + absl::string_view cluster_id) { + std::string name = ReverseTunnelEvent::getName(node_id, cluster_id); + auto ptr = std::make_shared( + ReverseTunnelEvent::Disconnected{std::move(name)}); + + context_.mainThreadDispatcher().post( + [this, ptr = std::move(ptr)]() mutable { this->removeConnection(std::move(ptr)); }); +} + +// This is only served on the main thread so no locks needed. +ReverseTunnelEvent::SharedConnections EventReporter::getAllConnections() { + ASSERT(context_.mainThreadDispatcher().isThreadSafe()); + stats_.reverse_tunnel_full_pulls_total_.inc(); + + ReverseTunnelEvent::SharedConnections all_connections; + all_connections.reserve(connections_.size()); + + for (auto& [key, val] : connections_) { + all_connections.push_back(val.connection); + } + return all_connections; +} + +EventReporterStats EventReporter::generateStats(const std::string& prefix, Stats::Scope& scope) { + return EventReporterStats{ALL_EVENT_REPORTER_STATS(POOL_COUNTER_PREFIX(scope, prefix), + POOL_GAUGE_PREFIX(scope, prefix))}; +} + +void EventReporter::notifyClients(ReverseTunnelEvent::BatchedEvents&& batch) { + for (auto& client : clients_) { + client->receiveEvents(batch); + } +} + +void EventReporter::addConnection(std::shared_ptr&& connection) { + ASSERT(context_.mainThreadDispatcher().isThreadSafe()); + + ENVOY_LOG(info, "EventReporter: Accepted a new connection. Node: {}, Cluster: {}, Tenant: {}", + connection->node_id, connection->cluster_id, connection->tenant_id); + + std::string name = ReverseTunnelEvent::getName(connection->node_id, connection->cluster_id); + auto [it, inserted] = + connections_.try_emplace(std::move(name), ConnectionEntry{std::move(connection), 1}); + + if (inserted) { + stats_.reverse_tunnel_unique_active_.inc(); + notifyClients(ReverseTunnelEvent::BatchedEvents{{it->second.connection}, {}}); + } else { + // Multiple reverse tunnels can share the same name (same node). + // We ref-count them and only notify clients of removal when the last one disconnects. + it->second.count++; + } + + stats_.reverse_tunnel_established_total_.inc(); + stats_.reverse_tunnel_active_.inc(); +} + +void EventReporter::removeConnection( + std::shared_ptr&& disconnection) { + ASSERT(context_.mainThreadDispatcher().isThreadSafe()); + + const auto& name = disconnection->name; + auto it = connections_.find(name); + + ENVOY_LOG(info, "EventReporter: Removed connection. Name: {}", name); + + if (it == connections_.end()) { + ENVOY_LOG(warn, "EventReporter: Tried to remove a connection which doesnt exist"); + return; + } + + // Only notify removal on the last ref — see addConnection for the ref-count rationale. + if (it->second.count == 1) { + connections_.erase(it); + stats_.reverse_tunnel_unique_active_.dec(); + notifyClients(ReverseTunnelEvent::BatchedEvents{{}, {disconnection}}); + } else { + it->second.count--; + } + + stats_.reverse_tunnel_closed_total_.inc(); + stats_.reverse_tunnel_active_.dec(); +} + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h new file mode 100644 index 0000000000000..4c07f661acf80 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h @@ -0,0 +1,66 @@ +#pragma once + +#include + +#include "envoy/stats/stats_macros.h" + +#include "source/common/common/logger.h" + +#include "contrib/envoy/extensions/reverse_tunnel_reporters/v3alpha/reporters/event_reporter.pb.h" +#include "contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +#define ALL_EVENT_REPORTER_STATS(COUNTER, GAUGE) \ + COUNTER(reverse_tunnel_established_total) \ + COUNTER(reverse_tunnel_closed_total) \ + COUNTER(reverse_tunnel_full_pulls_total) \ + GAUGE(reverse_tunnel_active, Accumulate) \ + GAUGE(reverse_tunnel_unique_active, Accumulate) + +struct EventReporterStats { + ALL_EVENT_REPORTER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT) +}; + +struct ConnectionEntry { + std::shared_ptr connection; + std::size_t count; +}; + +/// Aggregates reverse-tunnel connection/disconnection events, de-duplicates by +/// name, maintains stats, and fans out batched diffs as shared ptrs to registered clients. +class EventReporter : public ReverseTunnelReporterWithState, + public Logger::Loggable { +public: + using ConfigProto = + envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig; + + EventReporter(Server::Configuration::ServerFactoryContext& context, const ConfigProto& config, + std::vector&& clients); + + void onServerInitialized() override; + void reportConnectionEvent(absl::string_view node_id, absl::string_view cluster_id, + absl::string_view tenant_id) override; + void reportDisconnectionEvent(absl::string_view node_id, absl::string_view cluster_id) override; + ReverseTunnelEvent::SharedConnections getAllConnections() override; + +private: + static EventReporterStats generateStats(const std::string& prefix, Stats::Scope& scope); + void notifyClients(ReverseTunnelEvent::BatchedEvents&& batch); + void addConnection(std::shared_ptr&& connection); + void removeConnection(std::shared_ptr&& disconnection); + + Server::Configuration::ServerFactoryContext& context_; + std::vector clients_; + EventReporterStats stats_; + // Keyed by getName(node_id, cluster_id). + absl::flat_hash_map connections_; +}; + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h b/contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h new file mode 100644 index 0000000000000..4585c55fa6f6f --- /dev/null +++ b/contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h @@ -0,0 +1,112 @@ +#pragma once + +#include +#include +#include + +#include "envoy/common/pure.h" +#include "envoy/common/time.h" +#include "envoy/config/typed_config.h" +#include "envoy/extensions/bootstrap/reverse_tunnel/reverse_tunnel_reporter.h" +#include "envoy/server/factory_context.h" + +#include "source/common/common/fmt.h" + +#include "absl/strings/str_cat.h" + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +// The namespace holding the structs for the reverse tunnel events +namespace ReverseTunnelEvent { +// Builds the canonical connection name used as the de-duplication key in the reporter +// and as the tunnel name in the gRPC proto. +// TODO(aakugan) look into returning string_view and owning the data in the connection struct alone. +inline std::string getName(absl::string_view node_id, absl::string_view cluster_id) { + return absl::StrCat(node_id, ":", cluster_id); +} + +struct Connected { + std::string node_id; + std::string cluster_id; + std::string tenant_id; + Envoy::SystemTime created_at; +}; + +struct Disconnected { + std::string name; +}; + +using SharedConnections = std::vector>; +using SharedDisconnections = std::vector>; + +struct BatchedEvents { + SharedConnections connections; + SharedDisconnections disconnections; + + std::size_t size() const { return connections.size() + disconnections.size(); } + + void operator+=(BatchedEvents&& events) { + connections.reserve(connections.size() + events.connections.size()); + disconnections.reserve(disconnections.size() + events.disconnections.size()); + + for (auto& conn : events.connections) { + connections.push_back(std::move(conn)); + } + + for (auto& disconn : events.disconnections) { + disconnections.push_back(std::move(disconn)); + } + + events.connections.clear(); + events.disconnections.clear(); + } + + void clear() { + connections.clear(); + disconnections.clear(); + } +}; +} // namespace ReverseTunnelEvent + +// This will own the clients and expose an api for them to get the full state. +// This allows multiple clients to share data -> clients can focus on sending the data alone. +class ReverseTunnelReporterWithState : public ReverseTunnelReporter { +public: + virtual ~ReverseTunnelReporterWithState() = default; + + virtual ReverseTunnelEvent::SharedConnections getAllConnections() PURE; +}; + +using ReverseTunnelReporterWithStatePtr = std::unique_ptr; + +// This gets the ptr to the reporter for polling all the active connections. +// This also has receiveEvents to get the diff events from the reporter. +class ReverseTunnelReporterClient { +public: + virtual ~ReverseTunnelReporterClient() = default; + + virtual void onServerInitialized(ReverseTunnelReporterWithState* reporter) PURE; + + virtual void receiveEvents(ReverseTunnelEvent::BatchedEvents events) PURE; +}; + +using ReverseTunnelReporterClientPtr = std::unique_ptr; + +class ReverseTunnelReporterClientFactory : public Config::TypedFactory { +public: + virtual ReverseTunnelReporterClientPtr + createClient(Server::Configuration::ServerFactoryContext& context, + const Protobuf::Message& config) PURE; + + std::string category() const override { + return "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.clients"; + } +}; + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/test/clients/BUILD b/contrib/reverse_tunnel_reporter/test/clients/BUILD new file mode 100644 index 0000000000000..feeb167efd4d7 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/test/clients/BUILD @@ -0,0 +1,28 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_test", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_test( + name = "grpc_client_test", + srcs = ["grpc_client_test.cc"], + deps = [ + "//contrib/reverse_tunnel_reporter/source/clients/grpc_client:grpc_client_lib", + "//envoy/grpc:async_client_interface", + "//envoy/registry", + "//source/common/api:api_lib", + "//source/common/protobuf:utility_lib_header", + "//test/mocks/grpc:grpc_mocks", + "//test/mocks/local_info:local_info_mocks", + "//test/mocks/server:server_factory_context_mocks", + "//test/mocks/upstream:cluster_info_mocks", + "//test/mocks/upstream:thread_local_cluster_mocks", + "//test/test_common:registry_lib", + "//test/test_common:utility_lib", + ], +) diff --git a/contrib/reverse_tunnel_reporter/test/clients/grpc_client_test.cc b/contrib/reverse_tunnel_reporter/test/clients/grpc_client_test.cc new file mode 100644 index 0000000000000..47843ec64a47a --- /dev/null +++ b/contrib/reverse_tunnel_reporter/test/clients/grpc_client_test.cc @@ -0,0 +1,835 @@ +#include + +#include "envoy/grpc/async_client.h" +#include "envoy/registry/registry.h" + +#include "source/common/grpc/common.h" +#include "source/common/protobuf/utility.h" + +#include "test/mocks/grpc/mocks.h" +#include "test/mocks/local_info/mocks.h" +#include "test/mocks/server/server_factory_context.h" +#include "test/mocks/upstream/cluster_info.h" +#include "test/mocks/upstream/thread_local_cluster.h" +#include "test/test_common/utility.h" + +#include "contrib/reverse_tunnel_reporter/source/clients/grpc_client/client.h" +#include "contrib/reverse_tunnel_reporter/source/clients/grpc_client/factory.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::Invoke; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +class MockReverseTunnelReporter : public ReverseTunnelReporterWithState { +public: + MOCK_METHOD(void, onServerInitialized, (), (override)); + MOCK_METHOD(void, reportConnectionEvent, + (absl::string_view, absl::string_view, absl::string_view), (override)); + MOCK_METHOD(void, reportDisconnectionEvent, (absl::string_view, absl::string_view), (override)); + MOCK_METHOD(ReverseTunnelEvent::SharedConnections, getAllConnections, (), (override)); +}; + +GrpcConfigProto mock_config() { + GrpcConfigProto config_; + + config_.set_cluster("test_cluster"); + config_.mutable_default_send_interval()->set_seconds(2); + config_.mutable_connect_retry_interval()->set_seconds(2); + config_.set_max_retries(2); + config_.set_max_buffer(1000); + config_.set_stat_prefix("test.grpc_client"); + + return config_; +} + +ReverseTunnelEvent::SharedConnections make_connections(std::vector node_ids) { + ReverseTunnelEvent::SharedConnections connections; + std::string cluster = "test_cluster"; + std::string tenant = "test_tenant"; + + for (const auto& node : node_ids) { + auto conn = std::make_shared(ReverseTunnelEvent::Connected{ + node, cluster, tenant, std::chrono::system_clock::time_point(std::chrono::seconds(1))}); + connections.push_back(std::move(conn)); + } + + return connections; +} + +ReverseTunnelEvent::SharedDisconnections make_disconnections(std::vector node_ids) { + std::string cluster = "test_cluster"; + ReverseTunnelEvent::SharedDisconnections disconnections; + + for (const auto& node : node_ids) { + auto disconn = std::make_shared( + ReverseTunnelEvent::Disconnected{ReverseTunnelEvent::getName(node, cluster)}); + disconnections.push_back(std::move(disconn)); + } + + return disconnections; +} + +StreamTunnelsResp validate_req(Buffer::InstancePtr& request, + const ReverseTunnelEvent::BatchedEvents& actual, bool full_push) { + StreamTunnelsReq req; + bool success = Grpc::Common::parseBufferInstance(std::move(request), req); + EXPECT_EQ(success, true); + + EXPECT_EQ(req.added_tunnels_size(), actual.connections.size()); + EXPECT_EQ(req.removed_tunnel_names_size(), actual.disconnections.size()); + + for (std::size_t i = 0; i < actual.connections.size(); i++) + EXPECT_EQ(actual.connections[i]->node_id, + req.added_tunnels(i).tunnel_info().identity().node_id()); + + for (std::size_t i = 0; i < actual.disconnections.size(); i++) + EXPECT_EQ(actual.disconnections[i]->name, req.removed_tunnel_names(i)); + + EXPECT_EQ(full_push, req.full_push()); + + StreamTunnelsResp resp; + resp.set_request_nonce(req.nonce()); + + return resp; +} + +Protobuf::Duration getHalfDuration(const Protobuf::Duration& dur) { + return Protobuf::util::TimeUtil::MillisecondsToDuration( + DurationUtil::durationToMilliseconds(dur) / 2); +} + +class GrpcClientTest : public testing::Test { +public: + GrpcClientTest() {} + + void SetUp() override { + api_ = Api::createApiForTest(time_system_); + dispatcher_ = api_->allocateDispatcher("test_thread"); + + ON_CALL(context_, mainThreadDispatcher()).WillByDefault(ReturnRef(*dispatcher_)); + ON_CALL(context_, scope()).WillByDefault(ReturnRef(*stats_store_.rootScope())); + ON_CALL(context_, clusterManager()).WillByDefault(ReturnRef(cm_)); + + ON_CALL(context_, localInfo()).WillByDefault(ReturnRef(local_info_)); + ON_CALL(local_info_, nodeName()).WillByDefault(ReturnRef(node_id)); + ON_CALL(local_info_, clusterName()).WillByDefault(ReturnRef(cluster_id)); + + ON_CALL(cm_, getThreadLocalCluster(_)).WillByDefault(Return(&thread_local_cluster_)); + ON_CALL(thread_local_cluster_, info()).WillByDefault(Return(cluster_info_)); + ON_CALL(*cluster_info_, statsScope()).WillByDefault(ReturnRef(*stats_store_.rootScope())); + + ON_CALL(cm_, grpcAsyncClientManager()).WillByDefault(ReturnRef(manager_)); + ON_CALL(manager_, getOrCreateRawAsyncClient(_, _, _)) + .WillByDefault(Return(absl::StatusOr(async_client_))); + } + +protected: + void inc_time(const Protobuf::Duration& dur) { + time_system_.advanceTimeAsyncImpl( + std::chrono::milliseconds(DurationUtil::durationToMilliseconds(dur))); + dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + } + + void get_stream(int times) { + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)) + .Times(times) + .WillRepeatedly(Invoke([this](absl::string_view, absl::string_view, + Grpc::RawAsyncStreamCallbacks& callbacks, + const Http::AsyncClient::StreamOptions&) { + callbacks_ = &callbacks; + return async_stream_.get(); + })); + } + + GrpcClient::GrpcClientStats getStats() { + return GrpcClient::GrpcClientStats{context_, config_.stat_prefix(), config_.cluster()}; + } + + std::shared_ptr> async_client_{ + std::make_shared>()}; + std::unique_ptr> async_stream_{ + std::make_unique>()}; + Grpc::RawAsyncStreamCallbacks* callbacks_; + + NiceMock context_; + NiceMock cm_; + NiceMock thread_local_cluster_; + NiceMock local_info_; + NiceMock manager_; + std::shared_ptr> cluster_info_{ + std::make_shared>()}; + + Api::ApiPtr api_; + Event::SimulatedTimeSystem time_system_; + Event::DispatcherPtr dispatcher_; + Stats::IsolatedStoreImpl stats_store_; + + std::string node_id{"tunnel-v2"}; + std::string cluster_id{"tunnel-v2"}; + + GrpcConfigProto config_{mock_config()}; + NiceMock mock_reporter_; +}; + +// Check the connection behaviour on server initialization (infinite retries) +TEST_F(GrpcClientTest, RetryAttemptsOnStreamCreationFailure) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + + // The connection attempts shld not be bound by anything. + // Making it config_.max_retries + 2 for a simple check of not bound by max_retries. + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)) + .Times(config_.max_retries() + 2) + .WillRepeatedly(Return(nullptr)); + + client.onServerInitialized(&mock_reporter_); + + for (std::size_t i = 0; i < config_.max_retries() + 1; i++) { + inc_time(config_.connect_retry_interval()); + } + + // Not incremented because no connection attempt was successful. + // startRaw => nullptr. + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 0); + EXPECT_EQ(stats_.send_attempts_counter_.value(), 0); + EXPECT_EQ(stats_.nonce_acked_gauge_.value(), 0); + EXPECT_EQ(stats_.nonce_current_gauge_.value(), 0); + EXPECT_EQ(stats_ + .getCounter(stats_.disconnects_, + stats_.getTags( + Grpc::Status::WellKnownGrpcStatus::Internal, + GrpcDisconnectionReason::DisconnectReason::STREAM_CREATION_FAILED)) + .value(), + config_.max_retries() + 2); +} + +// Checks the happy path -> server connect and full push. +TEST_F(GrpcClientTest, ClientSendsFullPushOnConnect) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + + ReverseTunnelEvent::BatchedEvents events{make_connections({"node_1"}), {}}; + + EXPECT_CALL(mock_reporter_, getAllConnections()).WillOnce(Return(events.connections)); + + get_stream(1); + + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .WillOnce(Invoke([&events](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, events, true); + EXPECT_EQ(resp.request_nonce(), 1); + })); + + client.onServerInitialized(&mock_reporter_); + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.acks_received_counter_.value(), 0); + EXPECT_EQ(stats_.send_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.events_dropped_counter_.value(), 0); + EXPECT_EQ(stats_.queued_events_counter_.value(), 1); + EXPECT_EQ(stats_.out_of_order_acks_counter_.value(), 0); + EXPECT_EQ(stats_.nonce_current_gauge_.value(), 1); + EXPECT_EQ(stats_.nonce_acked_gauge_.value(), 0); + EXPECT_EQ(stats_.send_interval_gauge_.value(), + DurationUtil::durationToMilliseconds(config_.default_send_interval())); + EXPECT_EQ(stats_.sent_accepted_cnt_counter_.value(), 1); + EXPECT_EQ(stats_.sent_removed_cnt_counter_.value(), 0); +} + +// Checks the happy path -> Server up connect and send the diff after the full push +TEST_F(GrpcClientTest, ClientSendsDiffAfterFullPush) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + int cur = 0, total = 4; + + ReverseTunnelEvent::BatchedEvents batches[] = { + ReverseTunnelEvent::BatchedEvents{make_connections({"node_1"}), {}}, + ReverseTunnelEvent::BatchedEvents{make_connections({"node_2"}), + make_disconnections({"node_1"})}, + ReverseTunnelEvent::BatchedEvents{make_connections({"node_3", "node_4"}), {}}, + ReverseTunnelEvent::BatchedEvents{make_connections({"node_5"}), + make_disconnections({"node_3"})}}; + + get_stream(1); + + EXPECT_CALL(mock_reporter_, getAllConnections()).WillOnce(Invoke([&batches]() { + return batches[0].connections; + })); + + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .Times(total) + .WillRepeatedly(Invoke([this, &batches, &cur](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, batches[cur], cur == 0); + EXPECT_EQ(resp.request_nonce(), cur + 1); + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + })); + + client.onServerInitialized(&mock_reporter_); + cur++; + + for (; cur < total; cur++) { + client.receiveEvents(batches[cur]); + inc_time(config_.default_send_interval()); + } + + int total_events = 0; + for (int i = 0; i < total; i++) { + total_events += batches[i].size(); + } + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.acks_received_counter_.value(), total); + EXPECT_EQ(stats_.send_attempts_counter_.value(), total); + EXPECT_EQ(stats_.sent_accepted_cnt_counter_.value(), 5); + EXPECT_EQ(stats_.sent_removed_cnt_counter_.value(), 2); + EXPECT_EQ(stats_.events_dropped_counter_.value(), 0); + EXPECT_EQ(stats_.queued_events_counter_.value(), total_events); + EXPECT_EQ(stats_.out_of_order_acks_counter_.value(), 0); + EXPECT_EQ(stats_.nonce_current_gauge_.value(), total); + EXPECT_EQ(stats_.nonce_acked_gauge_.value(), total); + EXPECT_EQ(stats_.send_interval_gauge_.value(), + DurationUtil::durationToMilliseconds(config_.default_send_interval())); +} + +// Check the happy path -> config changes from the server response is applied +TEST_F(GrpcClientTest, ReportIntervalChangesReflectInClient) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + int cur = 0; + + ReverseTunnelEvent::BatchedEvents events; + + get_stream(1); + + EXPECT_CALL(mock_reporter_, getAllConnections()).WillOnce(Invoke([&events]() { + return events.connections; + })); + + // This should be called 3 times. + // Once for the first message and then twice for the next two increments. + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .Times(3) + .WillRepeatedly(Invoke([this, &events, &cur](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, events, cur == 0); + EXPECT_EQ(resp.request_nonce(), ++cur); + + if (cur == 1) { + *resp.mutable_report_interval() = getHalfDuration(config_.default_send_interval()); + } + + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + })); + + EXPECT_EQ(stats_.send_interval_gauge_.value(), + DurationUtil::durationToMilliseconds(config_.default_send_interval())); + client.onServerInitialized(&mock_reporter_); + EXPECT_EQ(stats_.send_interval_gauge_.value(), + DurationUtil::durationToMilliseconds(getHalfDuration(config_.default_send_interval()))); + + // This is already scheduled from the next time we will use the half interval for sending. + inc_time(config_.default_send_interval()); + inc_time(getHalfDuration(config_.default_send_interval())); + + EXPECT_EQ(stats_.send_attempts_counter_.value(), 3); +} + +// Check edge case -> Full push and then diffs on reconnect +TEST_F(GrpcClientTest, FullPushAndDiffOnReconnect) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + int cur = 0, total = 4; + + ReverseTunnelEvent::BatchedEvents batches[] = { + ReverseTunnelEvent::BatchedEvents{make_connections({"node_1"}), {}}, + ReverseTunnelEvent::BatchedEvents{make_connections({"node_2"}), + make_disconnections({"node_1"})}, + ReverseTunnelEvent::BatchedEvents{make_connections({"node_3", "node_4"}), {}}, + ReverseTunnelEvent::BatchedEvents{make_connections({"node_5"}), + make_disconnections({"node_3"})}}; + + // 2 stream creations: initial connect + reconnect after remote close. + get_stream(2); + + // getAllConnections is called once per full push (initial + reconnect). + EXPECT_CALL(mock_reporter_, getAllConnections()).Times(2).WillRepeatedly(Invoke([&batches]() { + return batches[0].connections; + })); + + // total + 1: the reconnect triggers an extra full push on top of the normal total sends. + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .Times(total + 1) + .WillRepeatedly(Invoke([this, &batches, &cur](Buffer::InstancePtr& request, bool) { + // cur is still 0 during both the initial connect and the reconnect full push, + // so both correctly validate as full_push=true against batches[0]. + auto resp = validate_req(request, batches[cur], cur == 0); + EXPECT_EQ(resp.request_nonce(), cur + 1); + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + })); + + client.onServerInitialized(&mock_reporter_); + + // disconnect and reconnect -> sends the full push automatically + callbacks_->onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Unknown, "Testing"); + inc_time(config_.connect_retry_interval()); + cur++; // cur becomes 1 only after the reconnect full push has already fired. + + for (; cur < total; cur++) { + client.receiveEvents(batches[cur]); + inc_time(config_.default_send_interval()); + } + + int total_sz = 0; + for (int i = 0; i < total; i++) { + total_sz += batches[i].size(); + } + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 2); + EXPECT_EQ(stats_.acks_received_counter_.value(), total + 1); + EXPECT_EQ(stats_.send_attempts_counter_.value(), total + 1); + // 6 = 1 (initial full push) + 1 (reconnect full push) + 1+2+1 from batches[1..3]. + EXPECT_EQ(stats_.sent_accepted_cnt_counter_.value(), 6); + EXPECT_EQ(stats_.sent_removed_cnt_counter_.value(), 2); + EXPECT_EQ(stats_.events_dropped_counter_.value(), 0); + // +batches[0].size(): the reconnect full push re-queues the initial connections. + EXPECT_EQ(stats_.queued_events_counter_.value(), total_sz + batches[0].size()); + EXPECT_EQ(stats_.out_of_order_acks_counter_.value(), 0); + EXPECT_EQ(stats_.nonce_current_gauge_.value(), total); + EXPECT_EQ(stats_.nonce_acked_gauge_.value(), total); + EXPECT_EQ(stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::Unknown, + GrpcDisconnectionReason::DisconnectReason::REMOTE_CLOSE)) + .value(), + 1); +} + +// Check edge case -> Disconnect on deadline exceeded +TEST_F(GrpcClientTest, DisconnectOnTooManyUnAckedRequests) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + std::size_t cur = 0; + + ReverseTunnelEvent::BatchedEvents events; + get_stream(1); + + // max_retries + 1: the initial connect sends once, then max_retries timer ticks each send once. + // On the next timer tick send() sees (nonce_current_ - nonce_acked_) > max_retries and + // disconnects before calling sendMessage, so the total successful sends is max_retries + 1. + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .Times(config_.max_retries() + 1) + .WillRepeatedly(Invoke([&events, &cur](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, events, cur == 0); + EXPECT_EQ(resp.request_nonce(), cur + 1); + + // No ACK is sent. It should eventually disconnect. + })); + + EXPECT_CALL(*async_stream_, resetStream()); + + client.onServerInitialized(&mock_reporter_); + cur++; + + // max_retries + 2: we need max_retries timer ticks for the sends, plus one more tick + // to trigger the disconnect check. The first send happens on connect (cur=0). + for (; cur < config_.max_retries() + 2; cur++) { + inc_time(config_.default_send_interval()); + } + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.acks_received_counter_.value(), 0); + EXPECT_EQ(stats_.send_attempts_counter_.value(), config_.max_retries() + 1); + EXPECT_EQ(stats_.nonce_current_gauge_.value(), config_.max_retries() + 1); + EXPECT_EQ(stats_.nonce_acked_gauge_.value(), 0); + EXPECT_EQ(stats_ + .getCounter( + stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded, + GrpcDisconnectionReason::DisconnectReason::MAX_RETRIES_EXCEEDED)) + .value(), + 1); +} + +// Check edge case -> Disconnect with Server on NACK. +TEST_F(GrpcClientTest, DisconnectOnNack) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + + ReverseTunnelEvent::BatchedEvents events; + get_stream(1); + + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .WillOnce(Invoke([this, &events](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, events, true); + EXPECT_EQ(resp.request_nonce(), 1); + resp.mutable_error_detail()->set_code(Grpc::Status::WellKnownGrpcStatus::Unavailable); + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + })); + + EXPECT_CALL(*async_stream_, resetStream()); + + client.onServerInitialized(&mock_reporter_); + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.send_attempts_counter_.value(), 1); + EXPECT_EQ( + stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::Aborted, + GrpcDisconnectionReason::DisconnectReason::NACK_RECEIVED)) + .value(), + 1); +} + +// Check edge case -> Disconnect on Buffer Full (Also ensure that full push has no limits) +TEST_F(GrpcClientTest, DisconnectOnBufferFull) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + + std::vector nodes; + for (std::size_t i = 0; i < config_.max_buffer() + 1; i++) { + nodes.push_back("node_" + std::to_string(i)); + } + + ReverseTunnelEvent::BatchedEvents connect_events{make_connections(nodes), {}}; + get_stream(1); + + EXPECT_CALL(mock_reporter_, getAllConnections()).WillOnce([&connect_events]() { + return connect_events.connections; + }); + + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .WillOnce(Invoke([this, &connect_events](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, connect_events, true); + EXPECT_EQ(resp.request_nonce(), 1); + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + })); + + EXPECT_CALL(*async_stream_, resetStream()); + + client.onServerInitialized(&mock_reporter_); + client.receiveEvents(connect_events); + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.send_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.events_dropped_counter_.value(), nodes.size()); + EXPECT_EQ(stats_.sent_accepted_cnt_counter_.value(), nodes.size()); + EXPECT_EQ(stats_.sent_removed_cnt_counter_.value(), 0); + EXPECT_EQ(stats_.queued_events_counter_.value(), nodes.size()); + EXPECT_EQ( + stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::ResourceExhausted, + GrpcDisconnectionReason::DisconnectReason::BUFFER_OVERFLOW)) + .value(), + 1); +} + +// Check edge case -> Prev Nonce ignored +TEST_F(GrpcClientTest, OutOfOrderNonce) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + std::size_t cur = 0; + + ReverseTunnelEvent::BatchedEvents events; + get_stream(1); + + // Send nonce=0 (already acked) for all responses to trigger out-of-order. + // nonce=0 is always <= nonce_acked_ (which starts at 0), so every response lands + // in the else branch and increments out_of_order_acks_counter_. + // Same +1/+2 arithmetic as DisconnectOnTooManyUnAckedRequests: max_retries + 1 sends + // succeed before the disconnect fires. + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .Times(config_.max_retries() + 1) + .WillRepeatedly(Invoke([this, &events, &cur](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, events, cur == 0); + EXPECT_EQ(resp.request_nonce(), cur + 1); + + resp.set_request_nonce(0); + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + })); + + EXPECT_CALL(*async_stream_, resetStream()); + + client.onServerInitialized(&mock_reporter_); + cur++; + + for (; cur < config_.max_retries() + 2; cur++) { + inc_time(config_.default_send_interval()); + } + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.send_attempts_counter_.value(), config_.max_retries() + 1); + EXPECT_EQ(stats_.out_of_order_acks_counter_.value(), config_.max_retries() + 1); + EXPECT_EQ(stats_.nonce_current_gauge_.value(), config_.max_retries() + 1); + EXPECT_EQ(stats_.nonce_acked_gauge_.value(), 0); + EXPECT_EQ(stats_ + .getCounter( + stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded, + GrpcDisconnectionReason::DisconnectReason::MAX_RETRIES_EXCEEDED)) + .value(), + 1); +} + +// Check edge case -> Skip Nonce +TEST_F(GrpcClientTest, SkipNonce) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + std::size_t cur = 0; + + ReverseTunnelEvent::BatchedEvents events; + get_stream(1); + + // max_retries + 2: the normal max_retries + 1 sends that would trigger disconnect, + // but a late ACK at iteration max_retries advances nonce_acked_ and buys one more send. + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .Times(config_.max_retries() + 2) + .WillRepeatedly(Invoke([this, &events, &cur](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, events, cur == 0); + EXPECT_EQ(resp.request_nonce(), cur + 1); + + // Only ACK the nonce at iteration max_retries, proving a single late ACK + // advances the watermark and prevents disconnect. + if (cur == config_.max_retries()) { + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + } + })); + + client.onServerInitialized(&mock_reporter_); + cur++; + + for (; cur < config_.max_retries() + 2; cur++) { + inc_time(config_.default_send_interval()); + } + + // After the stream is up, retroactively send ACKs for earlier nonces. + // These are all below nonce_acked_ now, so they count as out-of-order. + for (std::size_t i = 1; i <= config_.max_retries(); i++) { + StreamTunnelsResp resp; + resp.set_request_nonce(i); + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + } + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.send_attempts_counter_.value(), config_.max_retries() + 2); + EXPECT_EQ(stats_.out_of_order_acks_counter_.value(), config_.max_retries()); + EXPECT_EQ(stats_.acks_received_counter_.value(), 1); + EXPECT_EQ(stats_.nonce_current_gauge_.value(), config_.max_retries() + 2); + EXPECT_EQ(stats_.nonce_acked_gauge_.value(), config_.max_retries() + 1); +} + +// Edge case -> Remote close Status Ok +TEST_F(GrpcClientTest, OkRemoteClose) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + + ReverseTunnelEvent::BatchedEvents events; + get_stream(1); + + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .WillOnce(Invoke([&events](Buffer::InstancePtr& request, bool) { + auto response = validate_req(request, events, true); + EXPECT_EQ(response.request_nonce(), 1); + })); + + EXPECT_CALL(*async_stream_, resetStream()); + + client.onServerInitialized(&mock_reporter_); + callbacks_->onRemoteClose(Grpc::Status::WellKnownGrpcStatus::Ok, "Testing"); + + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 1); + EXPECT_EQ(stats_.send_attempts_counter_.value(), 1); + EXPECT_EQ(stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::Ok, + GrpcDisconnectionReason::DisconnectReason::REMOTE_CLOSE)) + .value(), + 1); +} + +// --- Hardening tests --- + +TEST_F(GrpcClientTest, ReceiveEventsBeforeInitialized) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + + ReverseTunnelEvent::BatchedEvents events{make_connections({"node_1"}), {}}; + client.receiveEvents(std::move(events)); + + EXPECT_EQ(stats_.queued_events_counter_.value(), 0); + EXPECT_EQ(stats_.events_dropped_counter_.value(), 0); +} + +TEST_F(GrpcClientTest, ClusterNotFoundLogsAndReturns) { + GrpcClient client{context_, config_}; + + EXPECT_CALL(cm_, getThreadLocalCluster(_)).WillOnce(Return(nullptr)); + + client.onServerInitialized(&mock_reporter_); + + ReverseTunnelEvent::BatchedEvents events{make_connections({"node_1"}), {}}; + client.receiveEvents(std::move(events)); + + auto stats_{getStats()}; + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 0); + EXPECT_EQ(stats_.queued_events_counter_.value(), 0); +} + +TEST_F(GrpcClientTest, ClientCreationFailureLogsAndReturns) { + GrpcClient client{context_, config_}; + + EXPECT_CALL(manager_, getOrCreateRawAsyncClient(_, _, _)) + .WillOnce(Return(absl::InvalidArgumentError("Bad Karma"))); + + client.onServerInitialized(&mock_reporter_); + + ReverseTunnelEvent::BatchedEvents events{make_connections({"node_1"}), {}}; + client.receiveEvents(std::move(events)); + + auto stats_{getStats()}; + EXPECT_EQ(stats_.connection_attempts_counter_.value(), 0); + EXPECT_EQ(stats_.queued_events_counter_.value(), 0); +} + +TEST_F(GrpcClientTest, BufferOverflowWhileDisconnectedDoesNotRearmRetry) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + + std::vector nodes; + for (std::size_t i = 0; i < config_.max_buffer() + 1; i++) { + nodes.push_back("node_" + std::to_string(i)); + } + + ReverseTunnelEvent::BatchedEvents big_batch{make_connections(nodes), {}}; + get_stream(1); + + EXPECT_CALL(mock_reporter_, getAllConnections()).WillOnce(Return(big_batch.connections)); + + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .WillOnce(Invoke([this, &big_batch](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, big_batch, true); + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + })); + + EXPECT_CALL(*async_stream_, resetStream()); + + client.onServerInitialized(&mock_reporter_); + + // First overflow: stream is alive, should disconnect. + client.receiveEvents(ReverseTunnelEvent::BatchedEvents{make_connections(nodes), {}}); + EXPECT_EQ( + stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::ResourceExhausted, + GrpcDisconnectionReason::DisconnectReason::BUFFER_OVERFLOW)) + .value(), + 1); + + // Second overflow: stream is null, should NOT increment disconnect counter. + client.receiveEvents(ReverseTunnelEvent::BatchedEvents{make_connections(nodes), {}}); + EXPECT_EQ( + stats_ + .getCounter(stats_.disconnects_, + stats_.getTags(Grpc::Status::WellKnownGrpcStatus::ResourceExhausted, + GrpcDisconnectionReason::DisconnectReason::BUFFER_OVERFLOW)) + .value(), + 1); + + // Events still counted as dropped both times. + EXPECT_EQ(stats_.events_dropped_counter_.value(), nodes.size() * 2); +} + +TEST_F(GrpcClientTest, MinSendIntervalFloor) { + GrpcClient client{context_, config_}; + auto stats_{getStats()}; + + ReverseTunnelEvent::BatchedEvents events; + get_stream(1); + + EXPECT_CALL(mock_reporter_, getAllConnections()).WillOnce(Return(events.connections)); + + EXPECT_CALL(*async_stream_, sendMessageRaw_(_, false)) + .WillOnce(Invoke([this, &events](Buffer::InstancePtr& request, bool) { + auto resp = validate_req(request, events, true); + + // Server tries to set interval to 10ms, below the 50ms floor. + *resp.mutable_report_interval() = Protobuf::util::TimeUtil::MillisecondsToDuration(10); + callbacks_->onReceiveMessageRaw(Grpc::Common::serializeMessage(resp)); + })); + + client.onServerInitialized(&mock_reporter_); + + // Should be clamped to kMinSendInterval (50ms), not the server's 10ms. + EXPECT_EQ(stats_.send_interval_gauge_.value(), kMinSendInterval.count()); +} + +// Verify default values when proto fields are unset/zero. +TEST_F(GrpcClientTest, ConfigDefaults) { + GrpcConfigProto bare_config; + bare_config.set_cluster("test_cluster"); + + GrpcClientConfig parsed(bare_config); + + EXPECT_EQ(parsed.stat_prefix, "reverse_tunnel_reporter_client.grpc_client"); + EXPECT_EQ(parsed.cluster, "test_cluster"); + EXPECT_EQ(parsed.send_interval.count(), 5000); + EXPECT_EQ(parsed.connect_retry_interval.count(), 5000); + EXPECT_EQ(parsed.max_retries, 5); + EXPECT_EQ(parsed.max_buffer, 1000000); +} + +class GrpcClientFactoryTest : public testing::Test { +public: + void SetUp() override { + factory_ = Registry::FactoryRegistry::getFactory( + "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.clients.grpc_client"); + ASSERT_NE(nullptr, factory_); + + ON_CALL(context_, messageValidationVisitor()) + .WillByDefault(ReturnRef(ProtobufMessage::getStrictValidationVisitor())); + } + +protected: + ReverseTunnelReporterClientFactory* factory_{}; + NiceMock context_; +}; + +TEST_F(GrpcClientFactoryTest, Name) { + EXPECT_EQ("envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.clients.grpc_client", + factory_->name()); +} + +TEST_F(GrpcClientFactoryTest, CreateEmptyConfigProto) { + auto config = factory_->createEmptyConfigProto(); + EXPECT_NE(nullptr, config); + EXPECT_NE(nullptr, dynamic_cast(config.get())); +} + +TEST_F(GrpcClientFactoryTest, CreateClientReturnsNonNull) { + Api::ApiPtr api = Api::createApiForTest(); + Event::DispatcherPtr dispatcher = api->allocateDispatcher("test"); + ON_CALL(context_, mainThreadDispatcher()).WillByDefault(ReturnRef(*dispatcher)); + Stats::IsolatedStoreImpl stats_store; + ON_CALL(context_, scope()).WillByDefault(ReturnRef(*stats_store.rootScope())); + + GrpcConfigProto config; + config.set_cluster("test_cluster"); + + auto client = factory_->createClient(context_, config); + EXPECT_NE(nullptr, client); +} + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/contrib/reverse_tunnel_reporter/test/reporters/BUILD b/contrib/reverse_tunnel_reporter/test/reporters/BUILD new file mode 100644 index 0000000000000..991e168a0ae8e --- /dev/null +++ b/contrib/reverse_tunnel_reporter/test/reporters/BUILD @@ -0,0 +1,24 @@ +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_test", + "envoy_contrib_package", +) + +licenses(["notice"]) # Apache 2 + +envoy_contrib_package() + +envoy_cc_test( + name = "event_reporter_test", + srcs = ["event_reporter_test.cc"], + deps = [ + "//contrib/reverse_tunnel_reporter/source/reporters/event_reporter:event_reporter_lib", + "//source/common/api:api_lib", + "//source/common/config:utility_lib", + "//test/mocks:common_lib", + "//test/mocks/server:server_mocks", + "//test/test_common:registry_lib", + "//test/test_common:test_time_lib", + "//test/test_common:utility_lib", + ], +) diff --git a/contrib/reverse_tunnel_reporter/test/reporters/event_reporter_test.cc b/contrib/reverse_tunnel_reporter/test/reporters/event_reporter_test.cc new file mode 100644 index 0000000000000..fd869977b94c4 --- /dev/null +++ b/contrib/reverse_tunnel_reporter/test/reporters/event_reporter_test.cc @@ -0,0 +1,567 @@ +#include "source/common/api/api_impl.h" + +#include "test/mocks/common.h" +#include "test/mocks/server/server_factory_context.h" +#include "test/test_common/registry.h" +#include "test/test_common/test_time.h" +#include "test/test_common/utility.h" + +#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/factory.h" +#include "contrib/reverse_tunnel_reporter/source/reporters/event_reporter/reporter.h" +#include "contrib/reverse_tunnel_reporter/source/reverse_tunnel_event_types.h" +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::Invoke; +using testing::NiceMock; +using testing::Return; +using testing::ReturnRef; + +namespace Envoy { +namespace Extensions { +namespace Bootstrap { +namespace ReverseConnection { + +class MockReverseTunnelReporterClient : public ReverseTunnelReporterClient { +public: + MockReverseTunnelReporterClient() = default; + ~MockReverseTunnelReporterClient() override = default; + + MOCK_METHOD(void, onServerInitialized, (ReverseTunnelReporterWithState*), (override)); + MOCK_METHOD(void, receiveEvents, (ReverseTunnelEvent::BatchedEvents), (override)); +}; + +class EventReporterTest : public testing::Test { +protected: + void SetUp() override { + api_ = Api::createApiForTest(); + dispatcher_ = api_->allocateDispatcher("test_thread"); + + ON_CALL(context_, mainThreadDispatcher()).WillByDefault(ReturnRef(*dispatcher_)); + ON_CALL(context_, scope()).WillByDefault(ReturnRef(*stats_store_.rootScope())); + ON_CALL(context_, messageValidationVisitor()) + .WillByDefault(ReturnRef(ProtobufMessage::getStrictValidationVisitor())); + + auto mock_client1 = std::make_unique>(); + auto mock_client2 = std::make_unique>(); + mock_client1_ = mock_client1.get(); + mock_client2_ = mock_client2.get(); + + std::vector clients; + clients.push_back(std::move(mock_client1)); + clients.push_back(std::move(mock_client2)); + + EventReporter::ConfigProto config; + config.set_stat_prefix("test_prefix"); + + reporter_ = std::make_unique(context_, config, std::move(clients)); + } + + void createTestConnection(const std::string& node_id, const std::string& cluster_id, + const std::string& tenant_id = "tenant1") { + reporter_->reportConnectionEvent(node_id, cluster_id, tenant_id); + } + + void createTestDisconnection(const std::string& node_id, const std::string& cluster_id) { + reporter_->reportDisconnectionEvent(node_id, cluster_id); + } + + void runDispatcher() { dispatcher_->run(Event::Dispatcher::RunType::NonBlock); } + + uint64_t getCounterValue(const std::string& name) { + return stats_store_.counterFromString("test_prefix." + name).value(); + } + + uint64_t getGaugeValue(const std::string& name) { + return stats_store_.gaugeFromString("test_prefix." + name, Stats::Gauge::ImportMode::Accumulate) + .value(); + } + + Api::ApiPtr api_; + Event::DispatcherPtr dispatcher_; + NiceMock context_; + Stats::TestUtil::TestStore stats_store_; + NiceMock* mock_client1_; + NiceMock* mock_client2_; + std::unique_ptr reporter_; +}; + +TEST_F(EventReporterTest, AddRemoveConnections) { + EXPECT_CALL(*mock_client1_, receiveEvents(_)) + .Times(4) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node1", batch.connections[0]->node_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node2", batch.connections[0]->node_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(0, batch.connections.size()); + EXPECT_EQ(1, batch.disconnections.size()); + EXPECT_EQ(ReverseTunnelEvent::getName("node1", "cluster1"), batch.disconnections[0]->name); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(0, batch.connections.size()); + EXPECT_EQ(1, batch.disconnections.size()); + EXPECT_EQ(ReverseTunnelEvent::getName("node2", "cluster2"), batch.disconnections[0]->name); + })); + + EXPECT_CALL(*mock_client2_, receiveEvents(_)).Times(4); + + createTestConnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(1, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + createTestConnection("node2", "cluster2"); + runDispatcher(); + + EXPECT_EQ(2, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(2, connections.size()); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(2, connections.size()); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(1, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + createTestDisconnection("node2", "cluster2"); + runDispatcher(); + + EXPECT_EQ(2, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_full_pulls_total")); +} + +TEST_F(EventReporterTest, DuplicateConnectionHandling) { + EXPECT_CALL(*mock_client1_, receiveEvents(_)) + .Times(2) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node1", batch.connections[0]->node_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(0, batch.connections.size()); + EXPECT_EQ(1, batch.disconnections.size()); + EXPECT_EQ(ReverseTunnelEvent::getName("node1", "cluster1"), batch.disconnections[0]->name); + })); + + EXPECT_CALL(*mock_client2_, receiveEvents(_)).Times(2); + + createTestConnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(1, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + createTestConnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(2, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(1, connections.size()); + EXPECT_EQ("node1", connections[0]->node_id); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(1, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(1, connections.size()); + EXPECT_EQ("node1", connections[0]->node_id); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(2, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_full_pulls_total")); +} + +TEST_F(EventReporterTest, PullsBeforeConnectionEvents) { + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_full_pulls_total")); +} + +TEST_F(EventReporterTest, RemoveNonExistentConnection) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::warn); + MockLogSink sink(Envoy::Logger::Registry::getSink()); + + EXPECT_CALL(sink, log(_, _)) + .WillOnce(Invoke([](absl::string_view, const spdlog::details::log_msg& msg) { + EXPECT_EQ(spdlog::level::warn, msg.level); + })); + + EXPECT_CALL(*mock_client1_, receiveEvents(_)).Times(0); + EXPECT_CALL(*mock_client2_, receiveEvents(_)).Times(0); + + createTestDisconnection("nonexistent", "connection"); + runDispatcher(); + + EXPECT_EQ(0, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_unique_active")); + + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + EXPECT_CALL(*mock_client1_, receiveEvents(_)) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node1", batch.connections[0]->node_id); + })); + EXPECT_CALL(*mock_client2_, receiveEvents(_)); + + createTestConnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(1, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); +} + +TEST_F(EventReporterTest, OnServerInitialized) { + Envoy::Logger::Registry::setLogLevel(spdlog::level::info); + MockLogSink sink(Envoy::Logger::Registry::getSink()); + + EXPECT_CALL(sink, log(_, _)) + .WillOnce(Invoke([](absl::string_view, const spdlog::details::log_msg& msg) { + EXPECT_EQ(spdlog::level::info, msg.level); + })); + + EXPECT_CALL(*mock_client1_, onServerInitialized(_)); + EXPECT_CALL(*mock_client2_, onServerInitialized(_)); + + reporter_->onServerInitialized(); + + EXPECT_EQ(0, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(0, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(0, getCounterValue("reverse_tunnel_full_pulls_total")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_unique_active")); +} + +TEST_F(EventReporterTest, DefaultStatPrefix) { + EventReporter::ConfigProto config; + + std::vector clients; + auto mock_client1 = std::make_unique>(); + auto mock_client2 = std::make_unique>(); + mock_client1_ = mock_client1.get(); + mock_client2_ = mock_client2.get(); + clients.push_back(std::move(mock_client1)); + clients.push_back(std::move(mock_client2)); + + auto default_reporter = std::make_unique(context_, config, std::move(clients)); + + EXPECT_CALL(*mock_client1_, receiveEvents(_)); + EXPECT_CALL(*mock_client2_, receiveEvents(_)); + + default_reporter->reportConnectionEvent("node1", "cluster1", "tenant1"); + runDispatcher(); + + EXPECT_EQ( + 1, stats_store_.counterFromString("reverse_tunnel_reporter.reverse_tunnel_established_total") + .value()); + EXPECT_EQ(1, stats_store_ + .gaugeFromString("reverse_tunnel_reporter.reverse_tunnel_active", + Stats::Gauge::ImportMode::Accumulate) + .value()); + EXPECT_EQ(1, stats_store_ + .gaugeFromString("reverse_tunnel_reporter.reverse_tunnel_unique_active", + Stats::Gauge::ImportMode::Accumulate) + .value()); + + auto connections = default_reporter->getAllConnections(); + EXPECT_EQ(1, connections.size()); + EXPECT_EQ( + 1, stats_store_.counterFromString("reverse_tunnel_reporter.reverse_tunnel_full_pulls_total") + .value()); +} + +TEST_F(EventReporterTest, MixedScenario) { + EXPECT_CALL(*mock_client1_, receiveEvents(_)) + .Times(4) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node1", batch.connections[0]->node_id); + EXPECT_EQ("tenant_A", batch.connections[0]->tenant_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node2", batch.connections[0]->node_id); + EXPECT_EQ("tenant_B", batch.connections[0]->tenant_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(0, batch.connections.size()); + EXPECT_EQ(1, batch.disconnections.size()); + EXPECT_EQ(ReverseTunnelEvent::getName("node2", "cluster2"), batch.disconnections[0]->name); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node3", batch.connections[0]->node_id); + EXPECT_EQ("tenant_C", batch.connections[0]->tenant_id); + })); + + EXPECT_CALL(*mock_client2_, receiveEvents(_)).Times(4); + + createTestConnection("node1", "cluster1", "tenant_A"); + runDispatcher(); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + createTestConnection("node2", "cluster2", "tenant_B"); + runDispatcher(); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(2, connections.size()); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestConnection("node1", "cluster1", "tenant_A"); + runDispatcher(); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(3, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + createTestConnection("node2", "cluster2", "tenant_B"); + runDispatcher(); + EXPECT_EQ(4, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(4, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(3, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(2, connections.size()); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestDisconnection("node2", "cluster2"); + runDispatcher(); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + createTestDisconnection("node2", "cluster2"); + runDispatcher(); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + createTestConnection("node3", "cluster3", "tenant_C"); + runDispatcher(); + EXPECT_EQ(5, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(2, connections.size()); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_full_pulls_total")); + + EXPECT_EQ(5, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(2, getGaugeValue("reverse_tunnel_unique_active")); +} + +TEST_F(EventReporterTest, LargeDuplicateCount) { + EXPECT_CALL(*mock_client1_, receiveEvents(_)) + .Times(2) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(1, batch.connections.size()); + EXPECT_EQ(0, batch.disconnections.size()); + EXPECT_EQ("node1", batch.connections[0]->node_id); + EXPECT_EQ("tenant_A", batch.connections[0]->tenant_id); + })) + .WillOnce(Invoke([](const ReverseTunnelEvent::BatchedEvents& batch) { + EXPECT_EQ(0, batch.connections.size()); + EXPECT_EQ(1, batch.disconnections.size()); + EXPECT_EQ(ReverseTunnelEvent::getName("node1", "cluster1"), batch.disconnections[0]->name); + })); + + EXPECT_CALL(*mock_client2_, receiveEvents(_)).Times(2); + + const int DUPLICATE_COUNT = 50; + + for (int i = 0; i < DUPLICATE_COUNT; i++) { + createTestConnection("node1", "cluster1", "tenant_A"); + runDispatcher(); + } + + EXPECT_EQ(DUPLICATE_COUNT, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(DUPLICATE_COUNT, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + auto connections = reporter_->getAllConnections(); + EXPECT_EQ(1, connections.size()); + EXPECT_EQ(1, getCounterValue("reverse_tunnel_full_pulls_total")); + + for (int i = 0; i < DUPLICATE_COUNT - 1; i++) { + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + } + + EXPECT_EQ(DUPLICATE_COUNT - 1, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(1, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(1, connections.size()); + EXPECT_EQ(2, getCounterValue("reverse_tunnel_full_pulls_total")); + + createTestDisconnection("node1", "cluster1"); + runDispatcher(); + + EXPECT_EQ(DUPLICATE_COUNT, getCounterValue("reverse_tunnel_established_total")); + EXPECT_EQ(DUPLICATE_COUNT, getCounterValue("reverse_tunnel_closed_total")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_active")); + EXPECT_EQ(0, getGaugeValue("reverse_tunnel_unique_active")); + + connections = reporter_->getAllConnections(); + EXPECT_EQ(0, connections.size()); + EXPECT_EQ(3, getCounterValue("reverse_tunnel_full_pulls_total")); +} + +// --- Factory tests --- + +class MockReverseTunnelReporterClientFactory : public ReverseTunnelReporterClientFactory { +public: + MOCK_METHOD(ReverseTunnelReporterClientPtr, createClient, + (Server::Configuration::ServerFactoryContext&, const Protobuf::Message&), (override)); + + std::string name() const override { return "mock_client_factory"; } + ProtobufTypes::MessagePtr createEmptyConfigProto() override { + return std::make_unique(); + } +}; + +class EventReporterFactoryTest : public testing::Test { +protected: + void SetUp() override { + ON_CALL(context_, messageValidationVisitor()) + .WillByDefault(ReturnRef(ProtobufMessage::getStrictValidationVisitor())); + ON_CALL(context_, scope()).WillByDefault(ReturnRef(*stats_store_.rootScope())); + + api_ = Api::createApiForTest(); + dispatcher_ = api_->allocateDispatcher("test_thread"); + ON_CALL(context_, mainThreadDispatcher()).WillByDefault(ReturnRef(*dispatcher_)); + } + + Api::ApiPtr api_; + Event::DispatcherPtr dispatcher_; + NiceMock context_; + Stats::TestUtil::TestStore stats_store_; + EventReporterFactory factory_; +}; + +TEST_F(EventReporterFactoryTest, Name) { + EXPECT_EQ( + "envoy.extensions.reverse_tunnel.reverse_tunnel_reporting_service.reporters.event_reporter", + factory_.name()); +} + +TEST_F(EventReporterFactoryTest, CreateEmptyConfigProto) { + auto config = factory_.createEmptyConfigProto(); + EXPECT_NE(nullptr, config); + EXPECT_NE( + nullptr, + dynamic_cast< + envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig*>( + config.get())); +} + +TEST_F(EventReporterFactoryTest, CreateReporterWithRegisteredClient) { + MockReverseTunnelReporterClientFactory mock_client_factory; + Registry::InjectFactory registered(mock_client_factory); + + EXPECT_CALL(mock_client_factory, createClient(_, _)) + .WillOnce(Invoke([](Server::Configuration::ServerFactoryContext&, + const Protobuf::Message&) -> ReverseTunnelReporterClientPtr { + return std::make_unique>(); + })); + + auto config = factory_.createEmptyConfigProto(); + auto& reporter_config = dynamic_cast< + envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig&>( + *config); + reporter_config.set_stat_prefix("test"); + + auto* client_entry = reporter_config.add_clients(); + client_entry->set_name("mock_client_factory"); + client_entry->mutable_typed_config()->PackFrom(Protobuf::Struct()); + + auto reporter = factory_.createReporter(context_, std::move(config)); + EXPECT_NE(nullptr, reporter); +} + +TEST_F(EventReporterFactoryTest, CreateClientWithUnknownFactoryThrows) { + auto config = factory_.createEmptyConfigProto(); + auto& reporter_config = dynamic_cast< + envoy::extensions::reverse_tunnel_reporters::v3alpha::reporters::EventReporterConfig&>( + *config); + reporter_config.set_stat_prefix("test"); + + auto* client_entry = reporter_config.add_clients(); + client_entry->set_name("nonexistent_client_factory"); + client_entry->mutable_typed_config()->PackFrom(Protobuf::Struct()); + + EXPECT_THROW_WITH_REGEX(factory_.createReporter(context_, std::move(config)), EnvoyException, + "Unknown Reporter Client Factory: 'nonexistent_client_factory'"); +} + +} // namespace ReverseConnection +} // namespace Bootstrap +} // namespace Extensions +} // namespace Envoy diff --git a/docs/root/api-v3/config/contrib/contrib.rst b/docs/root/api-v3/config/contrib/contrib.rst index ac17d06076276..5ce1c24cdb09a 100644 --- a/docs/root/api-v3/config/contrib/contrib.rst +++ b/docs/root/api-v3/config/contrib/contrib.rst @@ -21,3 +21,4 @@ Contrib extensions tap_sinks/tap_sinks load_balancing_policies/peak_ewma/peak_ewma istio/istio + reverse_tunnel_reporter/reverse_tunnel_reporter diff --git a/docs/root/api-v3/config/contrib/reverse_tunnel_reporter/reverse_tunnel_reporter.rst b/docs/root/api-v3/config/contrib/reverse_tunnel_reporter/reverse_tunnel_reporter.rst new file mode 100644 index 0000000000000..cc06a2bf352fa --- /dev/null +++ b/docs/root/api-v3/config/contrib/reverse_tunnel_reporter/reverse_tunnel_reporter.rst @@ -0,0 +1,9 @@ +Reverse tunnel reporter +======================= + +.. toctree:: + :glob: + :maxdepth: 2 + + ../../../extensions/reverse_tunnel_reporters/v3alpha/clients/grpc_client/* + ../../../extensions/reverse_tunnel_reporters/v3alpha/reporters/*