From e8491daaee8afad75a133c0a088e07afc70470df Mon Sep 17 00:00:00 2001 From: Jason Koch Date: Wed, 22 Apr 2026 23:00:34 +0000 Subject: [PATCH] feat: expose a Flush api for caller to issue a flush on thread-local endpoint --- spectator/publisher.cc | 37 ++++++++++++++++++++++--------------- spectator/publisher.h | 11 +++++++++++ spectator/registry.h | 2 ++ 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/spectator/publisher.cc b/spectator/publisher.cc index bd553d8..d87c0d5 100644 --- a/spectator/publisher.cc +++ b/spectator/publisher.cc @@ -57,29 +57,36 @@ void SpectatordPublisher::setup_unix_domain(absl::string_view path) { local_reconnect(path); // get a copy of the file path std::string local_path{path}; - sender_ = [local_path, this](std::string_view msg) { + + flusher_ = [local_path, this]() { + if (buffer_.empty()) return; + const auto now = std::chrono::steady_clock::now(); + for (auto i = 0; i < 3; ++i) { + try { + auto sent_bytes = local_socket_.send(asio::buffer(buffer_)); + logger_->trace("Sent (local): {} bytes, in total had {}", sent_bytes, buffer_.length()); + last_flush_time_ = now; + break; + } catch (std::exception& e) { + local_reconnect(local_path); + logger_->warn("Unable to send {} - attempt {}/3 ({})", buffer_, i, + e.what()); + } + } + buffer_.clear(); + }; + + sender_ = [this](std::string_view msg) { buffer_.append(msg); const auto now = std::chrono::steady_clock::now(); const bool should_flush = buffer_.length() >= bytes_to_buffer_ || now - last_flush_time_ >= flush_interval_; if (should_flush) { - for (auto i = 0; i < 3; ++i) { - try { - auto sent_bytes = local_socket_.send(asio::buffer(buffer_)); - logger_->trace("Sent (local): {} bytes, in total had {}", sent_bytes, buffer_.length()); - last_flush_time_ = now; - break; - } catch (std::exception& e) { - local_reconnect(local_path); - logger_->warn("Unable to send {} - attempt {}/3 ({})", buffer_, i, - e.what()); - } - } - buffer_.clear(); + flusher_(); } else { buffer_.push_back(NEW_LINE); - } + } }; } diff --git a/spectator/publisher.h b/spectator/publisher.h index a27bd54..3008f7c 100644 --- a/spectator/publisher.h +++ b/spectator/publisher.h @@ -7,6 +7,14 @@ namespace spectator { +// Thread safety: not thread-safe. All calls to send() and flush() must come +// from the same thread. The unix-domain buffered path shares mutable buffer_ +// and last_flush_time_ without locking; the UDP path is incidentally safe +// (one atomic datagram per send) but that isn't a supported guarantee. +// +// This single-threaded contract matches the proxyd/Envoy use case, where +// meter sends and the flush() timer tick both run on the same dispatcher +// thread. Other callers must serialize externally. class SpectatordPublisher { public: explicit SpectatordPublisher( @@ -17,10 +25,13 @@ class SpectatordPublisher { SpectatordPublisher(const SpectatordPublisher&) = delete; void send(std::string_view measurement) { sender_(measurement); }; + void flush() { flusher_(); }; protected: using sender_fun = std::function; sender_fun sender_; + using flusher_fun = std::function; + flusher_fun flusher_ = []() {}; private: void setup_nop_sender(); diff --git a/spectator/registry.h b/spectator/registry.h index 4f0d102..f448e43 100644 --- a/spectator/registry.h +++ b/spectator/registry.h @@ -326,6 +326,8 @@ class SpectatordRegistry state_.publisher = std::make_unique(config.endpoint, config.bytes_to_buffer, config.flush_interval, logger_); } + + void Flush() { state_.publisher->flush(); } }; /// A Registry that can be used for tests. It keeps state about which meters