Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions spectator/publisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,29 +57,36 @@ void SpectatordPublisher::setup_unix_domain(absl::string_view path) {
local_reconnect(path);
// get a copy of the file path
std::string local_path{path};
sender_ = [local_path, this](std::string_view msg) {

flusher_ = [local_path, this]() {
if (buffer_.empty()) return;
const auto now = std::chrono::steady_clock::now();
for (auto i = 0; i < 3; ++i) {
try {
auto sent_bytes = local_socket_.send(asio::buffer(buffer_));
logger_->trace("Sent (local): {} bytes, in total had {}", sent_bytes, buffer_.length());
last_flush_time_ = now;
break;
} catch (std::exception& e) {
local_reconnect(local_path);
logger_->warn("Unable to send {} - attempt {}/3 ({})", buffer_, i,
e.what());
}
}
buffer_.clear();
};

sender_ = [this](std::string_view msg) {
buffer_.append(msg);
const auto now = std::chrono::steady_clock::now();
const bool should_flush = buffer_.length() >= bytes_to_buffer_ ||
now - last_flush_time_ >= flush_interval_;

if (should_flush) {
for (auto i = 0; i < 3; ++i) {
try {
auto sent_bytes = local_socket_.send(asio::buffer(buffer_));
logger_->trace("Sent (local): {} bytes, in total had {}", sent_bytes, buffer_.length());
last_flush_time_ = now;
break;
} catch (std::exception& e) {
local_reconnect(local_path);
logger_->warn("Unable to send {} - attempt {}/3 ({})", buffer_, i,
e.what());
}
}
buffer_.clear();
flusher_();
} else {
buffer_.push_back(NEW_LINE);
}
}
};
}

Expand Down
11 changes: 11 additions & 0 deletions spectator/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@

namespace spectator {

// Thread safety: not thread-safe. All calls to send() and flush() must come
// from the same thread. The unix-domain buffered path shares mutable buffer_
// and last_flush_time_ without locking; the UDP path is incidentally safe
// (one atomic datagram per send) but that isn't a supported guarantee.
//
// This single-threaded contract matches the proxyd/Envoy use case, where
// meter sends and the flush() timer tick both run on the same dispatcher
// thread. Other callers must serialize externally.
class SpectatordPublisher {
public:
explicit SpectatordPublisher(
Expand All @@ -17,10 +25,13 @@ class SpectatordPublisher {
SpectatordPublisher(const SpectatordPublisher&) = delete;

void send(std::string_view measurement) { sender_(measurement); };
void flush() { flusher_(); };

protected:
using sender_fun = std::function<void(std::string_view)>;
sender_fun sender_;
using flusher_fun = std::function<void()>;
flusher_fun flusher_ = []() {};

private:
void setup_nop_sender();
Expand Down
2 changes: 2 additions & 0 deletions spectator/registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ class SpectatordRegistry
state_.publisher =
std::make_unique<SpectatordPublisher>(config.endpoint, config.bytes_to_buffer, config.flush_interval, logger_);
}

void Flush() { state_.publisher->flush(); }
};

/// A Registry that can be used for tests. It keeps state about which meters
Expand Down
Loading