Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ type Config struct {
WsURL string // Websocket Api url

WsHA bool // Use concurrent connections to multiple Streams servers
WsAllowOutOfOrder bool // Allow out-of-order reports through while still deduplicating HA duplicates
WsMaxReconnect int // Maximum number of reconnection attempts for Stream underlying connections
LogDebug bool // Log debug information
InsecureSkipVerify bool // Skip server certificate chain and host name verification
Expand Down Expand Up @@ -292,6 +293,7 @@ func (r *ReportResponse) UnmarshalJSON(b []byte) (err error)
type Stats struct {
Accepted uint64 // Total number of accepted reports
Deduplicated uint64 // Total number of deduplicated reports when in HA
OutOfOrder uint64 // Total number of out-of-order reports seen
Comment thread
ro-tex marked this conversation as resolved.
TotalReceived uint64 // Total number of received reports
PartialReconnects uint64 // Total number of partial reconnects when in HA
FullReconnects uint64 // Total number of full reconnects
Expand Down
1 change: 1 addition & 0 deletions go/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Config struct {
WsURL string // Websocket Api url
wsURL *url.URL // Websocket Api url
WsHA bool // Use concurrent connections to multiple Streams servers
WsAllowOutOfOrder bool // Allow out-of-order reports through while still deduplicating HA duplicates
WsMaxReconnect int // Maximum number of reconnection attempts for Stream underlying connections
LogDebug bool // Log debug information
InsecureSkipVerify bool // Skip server certificate chain and host name verification
Expand Down
37 changes: 29 additions & 8 deletions go/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Stream interface {
type Stats struct {
Accepted uint64 // Total number of accepted reports
Deduplicated uint64 // Total number of deduplicated reports when in HA
OutOfOrder uint64 // Total number of out-of-order reports seen
TotalReceived uint64 // Total number of received reports
PartialReconnects uint64 // Total number of partial reconnects when in HA
FullReconnects uint64 // Total number of full reconnects
Expand All @@ -63,8 +64,8 @@ type Stats struct {

func (s Stats) String() (st string) {
return fmt.Sprintf(
"accepted: %d, deduplicated: %d, total_received %d, partial_reconnects: %d, full_reconnects: %d, configured_connections: %d, active_connections %d",
s.Accepted, s.Deduplicated,
"accepted: %d, deduplicated: %d, out_of_order: %d, total_received %d, partial_reconnects: %d, full_reconnects: %d, configured_connections: %d, active_connections %d",
s.Accepted, s.Deduplicated, s.OutOfOrder,
s.TotalReceived, s.PartialReconnects,
s.FullReconnects, s.ConfiguredConnections, s.ActiveConnections,
)
Expand All @@ -88,6 +89,7 @@ type stream struct {
stats struct {
accepted atomic.Uint64
skipped atomic.Uint64
outOfOrder atomic.Uint64
partialReconnects atomic.Uint64
fullReconnects atomic.Uint64
activeConnections atomic.Uint64
Expand Down Expand Up @@ -310,6 +312,7 @@ func (s *stream) newWSconnWithRetry(origin string) (conn *wsConn, err error) {
func (s *stream) Stats() (st Stats) {
st.Accepted = s.stats.accepted.Load()
st.Deduplicated = s.stats.skipped.Load()
st.OutOfOrder = s.stats.outOfOrder.Load()
st.TotalReceived = st.Accepted + st.Deduplicated
st.PartialReconnects = s.stats.partialReconnects.Load()
st.FullReconnects = s.stats.fullReconnects.Load()
Expand Down Expand Up @@ -359,17 +362,35 @@ func (s *stream) Close() (err error) {

func (s *stream) accept(ctx context.Context, m *message) (err error) {
id := m.Report.FeedID.String()
ts := m.Report.ObservationsTimestamp

s.waterMarkMu.Lock()
// Skip older reports and reports with the same timestamp
if !m.Report.ObservationsTimestamp.After(s.waterMark[id]) {
s.stats.skipped.Add(1)
s.waterMarkMu.Unlock()
return nil
wm := s.waterMark[id]

if s.config.WsAllowOutOfOrder {
if ts.Equal(wm) {
s.stats.skipped.Add(1)
s.waterMarkMu.Unlock()
return nil
}
Comment thread
ro-tex marked this conversation as resolved.
Outdated
if ts.Before(wm) {
s.stats.outOfOrder.Add(1)
} else {
s.waterMark[id] = ts
}
} else {
if !ts.After(wm) {
s.stats.skipped.Add(1)
if ts.Before(wm) {
s.stats.outOfOrder.Add(1)
}
s.waterMarkMu.Unlock()
return nil
}
s.waterMark[id] = ts
}

s.stats.accepted.Add(1)
s.waterMark[id] = m.Report.ObservationsTimestamp
s.waterMarkMu.Unlock()

select {
Expand Down
157 changes: 157 additions & 0 deletions go/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,163 @@ func TestClient_StreamHA_OneOriginDown(t *testing.T) {

}

func TestClient_StreamOutOfOrder(t *testing.T) {
reports := []*ReportResponse{
{FeedID: feed1, ObservationsTimestamp: time.Unix(100, 0)},
{FeedID: feed1, ObservationsTimestamp: time.Unix(102, 0)},
{FeedID: feed1, ObservationsTimestamp: time.Unix(101, 0)}, // out-of-order
}

ms := newMockServer(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodHead {
return
}

conn, err := websocket.Accept(
w, r, &websocket.AcceptOptions{CompressionMode: websocket.CompressionContextTakeover},
)
if err != nil {
t.Fatalf("error accepting connection: %s", err)
}
defer func() { _ = conn.CloseNow() }()

for _, rpt := range reports {
b, err := json.Marshal(&message{rpt})
if err != nil {
t.Errorf("failed to serialize message: %s", err)
}
if err := conn.Write(context.Background(), websocket.MessageBinary, b); err != nil {
t.Errorf("failed to write message: %s", err)
}
}

for conn.Ping(context.Background()) == nil {
time.Sleep(100 * time.Millisecond)
}
})
defer ms.Close()

streamsClient, err := ms.Client()
if err != nil {
t.Fatalf("error creating client %s", err)
}

cc := streamsClient.(*client)
cc.config.Logger = LogPrintf
cc.config.LogDebug = true
cc.config.WsAllowOutOfOrder = true

sub, err := streamsClient.Stream(context.Background(), []feed.ID{feed1})
if err != nil {
t.Fatalf("error subscribing %s", err)
}
defer sub.Close()

var received []*ReportResponse
for i := 0; i < len(reports); i++ {
rep, err := sub.Read(context.Background())
if err != nil {
t.Fatalf("error reading report %s", err)
}
received = append(received, rep)
}

if !reportResponsesEqual(received, reports) {
t.Errorf("Read() = %v, want %v", received, reports)
}

stats := sub.Stats()
if stats.Accepted != 3 {
t.Errorf("stats.Accepted = %d, want 3", stats.Accepted)
}
if stats.OutOfOrder != 1 {
t.Errorf("stats.OutOfOrder = %d, want 1", stats.OutOfOrder)
}
if stats.Deduplicated != 0 {
t.Errorf("stats.Deduplicated = %d, want 0", stats.Deduplicated)
}
}

func TestClient_StreamOutOfOrder_DefaultDrop(t *testing.T) {
reports := []*ReportResponse{
{FeedID: feed1, ObservationsTimestamp: time.Unix(100, 0)},
{FeedID: feed1, ObservationsTimestamp: time.Unix(102, 0)},
{FeedID: feed1, ObservationsTimestamp: time.Unix(101, 0)}, // out-of-order, should be dropped
}

ms := newMockServer(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodHead {
return
}

conn, err := websocket.Accept(
w, r, &websocket.AcceptOptions{CompressionMode: websocket.CompressionContextTakeover},
)
if err != nil {
t.Fatalf("error accepting connection: %s", err)
}
defer func() { _ = conn.CloseNow() }()

for _, rpt := range reports {
b, err := json.Marshal(&message{rpt})
if err != nil {
t.Errorf("failed to serialize message: %s", err)
}
if err := conn.Write(context.Background(), websocket.MessageBinary, b); err != nil {
t.Errorf("failed to write message: %s", err)
}
}

for conn.Ping(context.Background()) == nil {
time.Sleep(100 * time.Millisecond)
}
})
defer ms.Close()

streamsClient, err := ms.Client()
if err != nil {
t.Fatalf("error creating client %s", err)
}

cc := streamsClient.(*client)
cc.config.Logger = LogPrintf
cc.config.LogDebug = true

sub, err := streamsClient.Stream(context.Background(), []feed.ID{feed1})
if err != nil {
t.Fatalf("error subscribing %s", err)
}
defer sub.Close()

var received []*ReportResponse
for i := 0; i < 2; i++ {
rep, err := sub.Read(context.Background())
if err != nil {
t.Fatalf("error reading report %s", err)
}
received = append(received, rep)
}

// Wait for the third message to be processed by accept (it should be dropped)
time.Sleep(50 * time.Millisecond)

expectedDelivered := reports[:2]
if !reportResponsesEqual(received, expectedDelivered) {
t.Errorf("Read() = %v, want %v", received, expectedDelivered)
}

stats := sub.Stats()
if stats.Accepted != 2 {
t.Errorf("stats.Accepted = %d, want 2", stats.Accepted)
}
if stats.Deduplicated != 1 {
t.Errorf("stats.Deduplicated = %d, want 1", stats.Deduplicated)
}
if stats.OutOfOrder != 1 {
t.Errorf("stats.OutOfOrder = %d, want 1", stats.OutOfOrder)
}
}

// Tests that when in HA mode both origins are up after a recovery period even if one origin is down on initial connection
func TestClient_StreamHA_OneOriginDownRecovery(t *testing.T) {
connectAttempts := &atomic.Uint64{}
Expand Down
13 changes: 13 additions & 0 deletions rust/crates/sdk/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ pub struct Config {
/// High Availability Mode: Use concurrent connections to multiple Streams servers
pub ws_ha: WebSocketHighAvailability,

/// Allow out-of-order reports through while still deduplicating HA duplicates
pub ws_allow_out_of_order: bool,

/// Maximum number of reconnection attempts for underlying WebSocket connections
pub ws_max_reconnect: usize,

Expand All @@ -65,6 +68,7 @@ pub struct Config {
impl Config {
const DEFAULT_WS_MAX_RECONNECT: usize = 5;
const DEFAULT_WS_HA: WebSocketHighAvailability = WebSocketHighAvailability::Disabled;
const DEFAULT_WS_ALLOW_OUT_OF_ORDER: bool = false;
const DEFAULT_INSECURE_SKIP_VERIFY: InsecureSkipVerify = InsecureSkipVerify::Disabled;
const DEFAULT_INSPECT_HTTP_RESPONSE: Option<fn(&Response)> = None;

Expand Down Expand Up @@ -140,6 +144,7 @@ impl Config {
rest_url,
ws_url,
ws_ha: Self::DEFAULT_WS_HA,
ws_allow_out_of_order: Self::DEFAULT_WS_ALLOW_OUT_OF_ORDER,
ws_max_reconnect: Self::DEFAULT_WS_MAX_RECONNECT,
insecure_skip_verify: Self::DEFAULT_INSECURE_SKIP_VERIFY,
inspect_http_response: Self::DEFAULT_INSPECT_HTTP_RESPONSE,
Expand All @@ -160,6 +165,7 @@ pub struct ConfigBuilder {
rest_url: String,
ws_url: String,
ws_ha: WebSocketHighAvailability,
ws_allow_out_of_order: bool,
ws_max_reconnect: usize,
insecure_skip_verify: InsecureSkipVerify,
inspect_http_response: Option<fn(&Response)>,
Expand All @@ -172,6 +178,12 @@ impl ConfigBuilder {
self
}

/// Sets the `ws_allow_out_of_order` parameter.
pub fn with_ws_allow_out_of_order(mut self, ws_allow_out_of_order: bool) -> Self {
self.ws_allow_out_of_order = ws_allow_out_of_order;
self
}

// Sets the `ws_max_reconnect` parameter.
pub fn with_ws_max_reconnect(mut self, ws_max_reconnect: usize) -> Self {
self.ws_max_reconnect = ws_max_reconnect;
Expand Down Expand Up @@ -206,6 +218,7 @@ impl ConfigBuilder {
rest_url: self.rest_url,
ws_url: self.ws_url,
ws_ha: self.ws_ha,
ws_allow_out_of_order: self.ws_allow_out_of_order,
ws_max_reconnect: self.ws_max_reconnect,
insecure_skip_verify: self.insecure_skip_verify,
inspect_http_response: self.inspect_http_response,
Expand Down
6 changes: 6 additions & 0 deletions rust/crates/sdk/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ struct Stats {
accepted: AtomicUsize,
/// Total number of deduplicated reports when in HA
deduplicated: AtomicUsize,
/// Total number of out-of-order reports seen
out_of_order: AtomicUsize,
/// Total number of partial reconnects when in HA
partial_reconnects: AtomicUsize,
/// Total number of full reconnects
Expand Down Expand Up @@ -135,6 +137,7 @@ impl Stream {
let stats = Arc::new(Stats {
accepted: AtomicUsize::new(0),
deduplicated: AtomicUsize::new(0),
out_of_order: AtomicUsize::new(0),
partial_reconnects: AtomicUsize::new(0),
full_reconnects: AtomicUsize::new(0),
configured_connections: AtomicUsize::new(0),
Expand Down Expand Up @@ -257,6 +260,7 @@ impl Stream {
StatsSnapshot {
accepted,
deduplicated,
out_of_order: self.stats.out_of_order.load(Ordering::SeqCst),
total_received: accepted + deduplicated,
partial_reconnects: self.stats.partial_reconnects.load(Ordering::SeqCst),
full_reconnects: self.stats.full_reconnects.load(Ordering::SeqCst),
Expand All @@ -273,6 +277,8 @@ pub struct StatsSnapshot {
pub accepted: usize,
/// Total number of deduplicated reports when in HA
pub deduplicated: usize,
/// Total number of out-of-order reports seen
pub out_of_order: usize,
/// Total number of received reports
pub total_received: usize,
/// Total number of partial reconnects when in HA
Expand Down
33 changes: 29 additions & 4 deletions rust/crates/sdk/src/stream/monitor_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,41 @@ pub(crate) async fn run_stream(
let feed_id = report.report.feed_id.to_hex_string();
let observations_timestamp = report.report.observations_timestamp;

if water_mark.lock().await.contains_key(&feed_id) && water_mark.lock().await[&feed_id] >= observations_timestamp {
stats.deduplicated.fetch_add(1, Ordering::SeqCst);
continue;
let mut wm = water_mark.lock().await;
let current_wm = wm.get(&feed_id).copied();

if let Some(current) = current_wm {
if config.ws_allow_out_of_order {
if observations_timestamp == current {
stats.deduplicated.fetch_add(1, Ordering::SeqCst);
drop(wm);
continue;
}
if observations_timestamp < current {
stats.out_of_order.fetch_add(1, Ordering::SeqCst);
} else {
wm.insert(feed_id.clone(), observations_timestamp);
}
} else {
if observations_timestamp <= current {
stats.deduplicated.fetch_add(1, Ordering::SeqCst);
if observations_timestamp < current {
stats.out_of_order.fetch_add(1, Ordering::SeqCst);
}
drop(wm);
continue;
}
wm.insert(feed_id.clone(), observations_timestamp);
}
} else {
wm.insert(feed_id.clone(), observations_timestamp);
}
drop(wm);

report_sender.send(report).await.map_err(|e| {
StreamError::ConnectionError(format!("Failed to send report: {}", e))
})?;

water_mark.lock().await.insert(feed_id, observations_timestamp);
stats.accepted.fetch_add(1, Ordering::SeqCst);

} else {
Expand Down
Loading
Loading