Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ type Config struct {
WsURL string // Websocket Api url

WsHA bool // Use concurrent connections to multiple Streams servers
WsAllowOutOfOrder bool // Allow out-of-order reports through while still deduplicating HA duplicates
WsMaxReconnect int // Maximum number of reconnection attempts for Stream underlying connections
LogDebug bool // Log debug information
InsecureSkipVerify bool // Skip server certificate chain and host name verification
Expand Down Expand Up @@ -292,6 +293,7 @@ func (r *ReportResponse) UnmarshalJSON(b []byte) (err error)
type Stats struct {
Accepted uint64 // Total number of accepted reports
Deduplicated uint64 // Total number of deduplicated reports when in HA
OutOfOrder uint64 // Total number of out-of-order reports seen
Comment thread
ro-tex marked this conversation as resolved.
TotalReceived uint64 // Total number of received reports
PartialReconnects uint64 // Total number of partial reconnects when in HA
FullReconnects uint64 // Total number of full reconnects
Expand Down
1 change: 1 addition & 0 deletions go/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Config struct {
WsURL string // Websocket Api url
wsURL *url.URL // Websocket Api url
WsHA bool // Use concurrent connections to multiple Streams servers
WsAllowOutOfOrder bool // Allow out-of-order reports through while still deduplicating HA duplicates
WsMaxReconnect int // Maximum number of reconnection attempts for Stream underlying connections
LogDebug bool // Log debug information
InsecureSkipVerify bool // Skip server certificate chain and host name verification
Expand Down
59 changes: 59 additions & 0 deletions go/dedup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package streams

const seenBufferSize = 32

type Verdict int

const (
Accept Verdict = iota
Duplicate
OutOfOrder
)

type feedState struct {
watermark int64
ring [seenBufferSize]int64
set map[int64]struct{}
cursor int
count int
}

type FeedDeduplicator struct {
feeds map[string]*feedState
}

func NewFeedDeduplicator() *FeedDeduplicator {
return &FeedDeduplicator{feeds: make(map[string]*feedState)}
}

func (d *FeedDeduplicator) Check(feedID string, ts int64) Verdict {
fs := d.feeds[feedID]
if fs == nil {
fs = &feedState{set: make(map[int64]struct{}, seenBufferSize)}
d.feeds[feedID] = fs
}
Comment thread
calvwang9 marked this conversation as resolved.

if _, dup := fs.set[ts]; dup {
return Duplicate
}

if fs.count == seenBufferSize {
evict := fs.ring[fs.cursor]
delete(fs.set, evict)
} else {
fs.count++
}
fs.ring[fs.cursor] = ts
fs.set[ts] = struct{}{}
fs.cursor = (fs.cursor + 1) % seenBufferSize

isOutOfOrder := fs.watermark > 0 && ts < fs.watermark
if ts > fs.watermark {
fs.watermark = ts
}
Comment thread
calvwang9 marked this conversation as resolved.

if isOutOfOrder {
return OutOfOrder
}
return Accept
}
109 changes: 109 additions & 0 deletions go/dedup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package streams

import "testing"

func TestFeedDeduplicator_Accept(t *testing.T) {
d := NewFeedDeduplicator()
if v := d.Check("feed-1", 100); v != Accept {
t.Fatalf("expected Accept, got %d", v)
}
}

func TestFeedDeduplicator_Duplicate(t *testing.T) {
d := NewFeedDeduplicator()
d.Check("feed-1", 100)
if v := d.Check("feed-1", 100); v != Duplicate {
t.Fatalf("expected Duplicate, got %d", v)
}
}

func TestFeedDeduplicator_OutOfOrder(t *testing.T) {
d := NewFeedDeduplicator()
d.Check("feed-1", 200)
if v := d.Check("feed-1", 100); v != OutOfOrder {
t.Fatalf("expected OutOfOrder, got %d", v)
}
}

func TestFeedDeduplicator_OutOfOrderNotDuplicate(t *testing.T) {
d := NewFeedDeduplicator()
d.Check("feed-1", 200)
v := d.Check("feed-1", 100)
if v != OutOfOrder {
t.Fatalf("expected OutOfOrder for first OOO delivery, got %d", v)
}
if v := d.Check("feed-1", 100); v != Duplicate {
t.Fatalf("expected Duplicate for second OOO delivery, got %d", v)
}
}

func TestFeedDeduplicator_FIFOEviction(t *testing.T) {
d := NewFeedDeduplicator()
for i := int64(1); i <= seenBufferSize; i++ {
d.Check("feed-1", i)
}
d.Check("feed-1", 33)
// ts=2 (second inserted) should still be in the buffer
if v := d.Check("feed-1", 2); v != Duplicate {
t.Fatalf("expected ts=2 still in buffer, got %d", v)
}
// ts=1 (first inserted) was evicted by ts=33
if v := d.Check("feed-1", 1); v == Duplicate {
t.Fatal("expected ts=1 to be evicted (FIFO), but got Duplicate")
}
}

func TestFeedDeduplicator_FIFOEvictsOldestInsertedNotSmallest(t *testing.T) {
d := NewFeedDeduplicator()
// Insert out of order: 100, 1, 2, 3, ..., 31 (total 32 entries)
d.Check("feed-1", 100)
for i := int64(1); i <= seenBufferSize-1; i++ {
d.Check("feed-1", i)
}
// Buffer is full. ts=100 was inserted first (oldest by insertion).
// Adding ts=999 should evict ts=100, NOT ts=1 (the smallest value).
d.Check("feed-1", 999)
// ts=1 should still be present (smallest value, but NOT oldest inserted)
if v := d.Check("feed-1", 1); v != Duplicate {
t.Fatalf("expected ts=1 (smallest value, but not oldest inserted) to remain, got %d", v)
}
// ts=100 should have been evicted (oldest inserted)
if v := d.Check("feed-1", 100); v == Duplicate {
t.Fatal("expected ts=100 (oldest inserted) to be evicted, but got Duplicate")
}
}

func TestFeedDeduplicator_IndependentFeeds(t *testing.T) {
d := NewFeedDeduplicator()
d.Check("feed-a", 100)
d.Check("feed-b", 100)

if v := d.Check("feed-a", 100); v != Duplicate {
t.Fatalf("expected Duplicate for feed-a, got %d", v)
}
if v := d.Check("feed-b", 100); v != Duplicate {
t.Fatalf("expected Duplicate for feed-b, got %d", v)
}
// Different feed, same ts is not a duplicate
if v := d.Check("feed-c", 100); v != Accept {
t.Fatalf("expected Accept for new feed-c, got %d", v)
}
}

func TestFeedDeduplicator_WatermarkZeroNotOutOfOrder(t *testing.T) {
d := NewFeedDeduplicator()
// First report at ts=0 should be Accept, not OutOfOrder
if v := d.Check("feed-1", 0); v != Accept {
t.Fatalf("expected Accept for first report at ts=0, got %d", v)
}
}

func TestFeedDeduplicator_HADuplicateAfterWatermarkAdvance(t *testing.T) {
d := NewFeedDeduplicator()
d.Check("feed-1", 100) // Accept
d.Check("feed-1", 200) // Accept, watermark -> 200
// HA duplicate of ts=100 arrives from second connection
if v := d.Check("feed-1", 100); v != Duplicate {
t.Fatalf("expected Duplicate for HA retransmit, got %d", v)
}
}
32 changes: 21 additions & 11 deletions go/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Stream interface {
type Stats struct {
Accepted uint64 // Total number of accepted reports
Deduplicated uint64 // Total number of deduplicated reports when in HA
OutOfOrder uint64 // Total number of out-of-order reports seen
TotalReceived uint64 // Total number of received reports
PartialReconnects uint64 // Total number of partial reconnects when in HA
FullReconnects uint64 // Total number of full reconnects
Expand All @@ -63,8 +64,8 @@ type Stats struct {

func (s Stats) String() (st string) {
return fmt.Sprintf(
"accepted: %d, deduplicated: %d, total_received %d, partial_reconnects: %d, full_reconnects: %d, configured_connections: %d, active_connections %d",
s.Accepted, s.Deduplicated,
"accepted: %d, deduplicated: %d, out_of_order: %d, total_received %d, partial_reconnects: %d, full_reconnects: %d, configured_connections: %d, active_connections %d",
s.Accepted, s.Deduplicated, s.OutOfOrder,
s.TotalReceived, s.PartialReconnects,
s.FullReconnects, s.ConfiguredConnections, s.ActiveConnections,
)
Expand All @@ -82,12 +83,13 @@ type stream struct {
closeError atomic.Value
connStatusCallback func(isConneccted bool, host string, origin string)

waterMarkMu sync.Mutex
waterMark map[string]time.Time
feedsMu sync.Mutex
dedup *FeedDeduplicator

stats struct {
accepted atomic.Uint64
skipped atomic.Uint64
outOfOrder atomic.Uint64
partialReconnects atomic.Uint64
fullReconnects atomic.Uint64
activeConnections atomic.Uint64
Expand All @@ -107,7 +109,7 @@ func (c *client) newStream(ctx context.Context, httpClient *http.Client, feedIDs
config: c.config,
output: make(chan *ReportResponse, 1),
feedIDs: feedIDs,
waterMark: make(map[string]time.Time),
dedup: NewFeedDeduplicator(),
streamCtx: streamCtx,
streamCtxCancel: streamCtxCancel,
}
Expand Down Expand Up @@ -310,6 +312,7 @@ func (s *stream) newWSconnWithRetry(origin string) (conn *wsConn, err error) {
func (s *stream) Stats() (st Stats) {
st.Accepted = s.stats.accepted.Load()
st.Deduplicated = s.stats.skipped.Load()
st.OutOfOrder = s.stats.outOfOrder.Load()
st.TotalReceived = st.Accepted + st.Deduplicated
st.PartialReconnects = s.stats.partialReconnects.Load()
st.FullReconnects = s.stats.fullReconnects.Load()
Expand Down Expand Up @@ -359,18 +362,25 @@ func (s *stream) Close() (err error) {

func (s *stream) accept(ctx context.Context, m *message) (err error) {
id := m.Report.FeedID.String()
ts := m.Report.ObservationsTimestamp.UnixMilli()

s.waterMarkMu.Lock()
// Skip older reports and reports with the same timestamp
if !m.Report.ObservationsTimestamp.After(s.waterMark[id]) {
s.feedsMu.Lock()
verdict := s.dedup.Check(id, ts)
s.feedsMu.Unlock()

switch verdict {
case Duplicate:
s.stats.skipped.Add(1)
s.waterMarkMu.Unlock()
return nil
case OutOfOrder:
s.stats.outOfOrder.Add(1)
if !s.config.WsAllowOutOfOrder {
s.stats.skipped.Add(1)
return nil
}
}

s.stats.accepted.Add(1)
s.waterMark[id] = m.Report.ObservationsTimestamp
s.waterMarkMu.Unlock()

select {
case <-ctx.Done():
Expand Down
Loading