Skip to content

[Task]: [Go][Prism] Resolve TODO: Optimize watermark reads in stageState using min-heap #38403

@Mathdee

Description

@Mathdee

What needs to happen?

In beam/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go, the function minPendingTimestampLocked performs a linear scan over ss.pendingByKeys every time the watermark refreshes.

As noted in the existing comment inside that function:

// TODO(lostluck): Can we figure out how to avoid checking every key on every watermark refresh?


The Fix:
I will replace the current O(K) iteration over ss.pendingByKeys with a slice-based min-heap. This moves the retrieval of the minimum pending timestamp from a linear scan to a constant time (O(1)) lookup.


Technical Details & Impact:

  • Scaling: The read latency stays at a flat 13ns no matter the number of keys (tested from 1k to 1M keys) which replaces the current linear growth.

  • Memory Efficiency: There is 0 allocation because it doesn't create a temporary memory while reading data which removes most actions from the garbage collector, keeping the scheduler efficient.

  • CPU Friendly: Instead of going over the memory to look for a map, it grabs what it needs from an organized list reducing additional CPU computation.

  • Consistent Speed: The read time is locked at around 13ns, no matter the size of the pipeline the system will not jitter or slow down.

Issue Priority

Priority: 2 (default / most normal work should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Prism Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions