From a8ed2f2620409f3a33b75de1761557ae9a68b7fe Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Thu, 30 Apr 2026 10:33:47 -0700 Subject: [PATCH 1/4] Adds backlog reporting support for non-fnapi based SDF's. --- ...oundedSplittableProcessElementInvoker.java | 55 +++++- .../SplittableParDoViaKeyedWorkItems.java | 9 + .../core/SplittableProcessElementInvoker.java | 22 ++- .../apache/beam/runners/core/StepContext.java | 6 + ...edSplittableProcessElementInvokerTest.java | 29 +++ .../core/SplittableParDoProcessFnTest.java | 171 +++++++++++++++++- .../worker/SplittableProcessFnFactory.java | 1 + .../worker/StreamingModeExecutionContext.java | 16 ++ .../StreamingModeExecutionContextTest.java | 25 +++ .../java/org/apache/beam/sdk/io/Read.java | 30 ++- .../beam/sdk/transforms/PeriodicSequence.java | 3 +- 11 files changed, 349 insertions(+), 18 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 07e4756885dd..00e19d8db9b6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -278,8 +278,61 @@ public FinishBundleContext finishBundleContext(DoFn doFn) { if (residual == null) { return new Result(null, cont, null, null); } + final KV> residualForGetSize = residual; + double backlogBytes = + invoker.invokeGetSize( + new DoFnInvoker.BaseArgumentProvider() { + @Override + public String getErrorContext() { + return OutputAndTimeBoundedSplittableProcessElementInvoker.class.getSimpleName() + + "/GetSize"; + } + + @Override + public Object restriction() { + return residualForGetSize.getKey(); + } + + @Override + public InputT element(DoFn doFn) { + return element.getValue(); + } + + @Override + public Instant timestamp(DoFn doFn) { + return element.getTimestamp(); + } + + @Override + public RestrictionTracker restrictionTracker() { + return tracker; + } + + @Override + public WatermarkEstimator watermarkEstimator() { + return watermarkEstimator; + } + + @Override + public PipelineOptions pipelineOptions() { + return pipelineOptions; + } + + @Override + public Object sideInput(String tagId) { + PCollectionView view = sideInputMapping.get(tagId); + if (view == null) { + throw new IllegalArgumentException("calling getSideInput() with unknown view"); + } + return processContext.sideInput(view); + } + }); return new Result( - residual.getKey(), cont, residual.getValue().getKey(), residual.getValue().getValue()); + residual.getKey(), + cont, + residual.getValue().getKey(), + residual.getValue().getValue(), + backlogBytes); } private class ProcessContext extends DoFn.ProcessContext diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index 3519a74aada7..39125b2d1ba4 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -22,6 +22,7 @@ import com.google.auto.service.AutoService; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; @@ -281,6 +282,7 @@ public static class ProcessFn invoker; + private transient @Nullable Consumer backlogBytesCallback; public ProcessFn( DoFn fn, @@ -323,6 +325,10 @@ public void setProcessElementInvoker( this.processElementInvoker = invoker; } + public void setBacklogBytesCallback(Consumer backlogBytesCallback) { + this.backlogBytesCallback = backlogBytesCallback; + } + public DoFn getFn() { return fn; } @@ -622,6 +628,9 @@ public String getErrorContext() { } else { holdState.clear(); } + if (backlogBytesCallback != null && result.getBacklogBytes() >= 0) { + backlogBytesCallback.accept(result.getBacklogBytes()); + } } private DoFnInvoker.ArgumentProvider wrapOptionsAsSetup( diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java index 1ff66d6e517c..52c9f6712b53 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java @@ -42,6 +42,7 @@ public class Result { private final DoFn.ProcessContinuation continuation; private final @Nullable Instant futureOutputWatermark; private final @Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState; + private final double backlogBytes; @SuppressFBWarnings( value = "NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE", @@ -50,12 +51,27 @@ public Result( @Nullable RestrictionT residualRestriction, DoFn.ProcessContinuation continuation, @Nullable Instant futureOutputWatermark, - @Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState) { + @Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState, + double backlogBytes) { checkArgument(continuation != null, "continuation must not be null"); this.continuation = continuation; this.residualRestriction = residualRestriction; this.futureOutputWatermark = futureOutputWatermark; this.futureWatermarkEstimatorState = futureWatermarkEstimatorState; + this.backlogBytes = backlogBytes; + } + + public Result( + @Nullable RestrictionT residualRestriction, + DoFn.ProcessContinuation continuation, + @Nullable Instant futureOutputWatermark, + @Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState) { + this( + residualRestriction, + continuation, + futureOutputWatermark, + futureWatermarkEstimatorState, + -1.0); } /** @@ -76,6 +92,10 @@ public DoFn.ProcessContinuation getContinuation() { public @Nullable WatermarkEstimatorStateT getFutureWatermarkEstimatorState() { return futureWatermarkEstimatorState; } + + public double getBacklogBytes() { + return backlogBytes; + } } /** diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java index d2a03ff6ab39..e07d1a105862 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java @@ -36,4 +36,10 @@ public interface StepContext { default BundleFinalizer bundleFinalizer() { throw new UnsupportedOperationException("BundleFinalizer is unsupported."); } + + /** + * Set the current backlog bytes for this step. This is mainly used by splittable DoFn to report + * the size of the residual restriction. + */ + default void setBacklogBytes(double backlogBytes) {} } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java index 1750cceffa0e..f6295fa3463a 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java @@ -95,6 +95,24 @@ public OffsetRange getInitialRestriction(@SuppressWarnings("unused") @Element Vo } } + private static class GetSizeFn extends DoFn { + @ProcessElement + public ProcessContinuation process(RestrictionTracker tracker) { + assertTrue(tracker.tryClaim(tracker.currentRestriction().getFrom())); + return resume(); + } + + @GetInitialRestriction + public OffsetRange getInitialRestriction() { + return new OffsetRange(0, 10); + } + + @GetSize + public double getSize(@Restriction OffsetRange range) { + return range.getTo() - range.getFrom(); + } + } + private SplittableProcessElementInvoker.Result runTest( int totalNumOutputs, Duration sleepBeforeFirstClaim, @@ -237,4 +255,15 @@ public OffsetRange getInitialRestriction( e.expectMessage("Output is not allowed after a failed tryClaim()"); runTest(brokenFn, new OffsetRange(0, 5)); } + + @Test + public void testBacklogBytes() throws Exception { + GetSizeFn fn = new GetSizeFn(); + OffsetRange initialRestriction = new OffsetRange(0, 10); + SplittableProcessElementInvoker.Result res = + runTest(fn, initialRestriction); + // GetSizeFn only claims 1 element and then takes a checkpoint. + assertEquals(9.0, res.getBacklogBytes(), 0.001); + assertEquals(new OffsetRange(1, 10), res.getResidualRestriction()); + } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java index ef1f201ca1ee..35444003195e 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java @@ -31,6 +31,7 @@ import static org.junit.Assert.assertTrue; import java.io.Serializable; +import java.lang.reflect.Constructor; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -44,12 +45,16 @@ import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.testing.ResetDateTimeProvider; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.transforms.PeriodicSequence.SequenceDefinition; import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; @@ -68,6 +73,7 @@ import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; @@ -140,6 +146,8 @@ private static class ProcessFnTester< private InMemoryTimerInternals timerInternals; private TestInMemoryStateInternals stateInternals; private InMemoryBundleFinalizer bundleFinalizer; + private final ProcessFn + processFn; ProcessFnTester( Instant currentProcessingTime, @@ -154,15 +162,14 @@ private static class ProcessFnTester< // encode IntervalWindow's because that's what all tests here use. WindowingStrategy windowingStrategy = (WindowingStrategy) WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(1))); - final ProcessFn - processFn = - new ProcessFn<>( - fn, - inputCoder, - restrictionCoder, - watermarkEstimatorStateCoder, - windowingStrategy, - Collections.emptyMap()); + this.processFn = + new ProcessFn<>( + fn, + inputCoder, + restrictionCoder, + watermarkEstimatorStateCoder, + windowingStrategy, + Collections.emptyMap()); this.tester = DoFnTester.of(processFn); this.timerInternals = new InMemoryTimerInternals(); this.stateInternals = new TestInMemoryStateInternals<>("dummy"); @@ -386,6 +393,35 @@ public WatermarkEstimators.Manual newWatermarkEstimator( } } + private static class GetSizeFn extends DoFn { + @ProcessElement + public ProcessContinuation process( + ProcessContext c, RestrictionTracker tracker) { + for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) { + c.output(String.valueOf(i)); + if (i == 1) { + return resume(); + } + } + return stop(); + } + + @GetInitialRestriction + public OffsetRange getInitialRestriction() { + return new OffsetRange(0, 10); + } + + @NewTracker + public OffsetRangeTracker newTracker(@Restriction OffsetRange range) { + return new OffsetRangeTracker(range); + } + + @GetSize + public double getSize(@Restriction OffsetRange range) { + return range.getTo() - range.getFrom(); + } + } + @Test public void testDrains() throws Exception { DoFn fn = new WatermarkUpdateFn(); @@ -684,4 +720,121 @@ public void testInvokesLifecycleMethods() throws Exception { tester.startElement(42, new SomeRestriction()); } } + + @Test + public void testReportsBacklog() throws Exception { + DoFn fn = new GetSizeFn(); + Instant base = Instant.now(); + final List backlogs = new ArrayList<>(); + + try (ProcessFnTester tester = + new ProcessFnTester<>( + base, + fn, + BigEndianIntegerCoder.of(), + SerializableCoder.of(OffsetRange.class), + VoidCoder.of(), + MAX_OUTPUTS_PER_BUNDLE, + MAX_BUNDLE_DURATION)) { + tester.processFn.setBacklogBytesCallback(backlogs::add); + + tester.startElement(42, new OffsetRange(0, 10)); + // First call outputs 0, 1 and then resumes. + // The residual range should be [2, 10), so size is 8. + assertEquals(1, backlogs.size()); + assertEquals(8.0, backlogs.get(0), 0.001); + } + } + + @Test + public void testPeriodicSequenceBacklog() throws Exception { + Instant base = Instant.now(); + // Start 10 seconds ago, end 10 seconds from now, interval 1 second. + // PeriodicSequenceFn should have some backlog. + SequenceDefinition definition = + new SequenceDefinition( + base.minus(Duration.standardSeconds(10)), + base.plus(Duration.standardSeconds(10)), + Duration.standardSeconds(1)); + + // PeriodicSequenceFn is private, so we use reflection to instantiate it. + Class fnClass = + Class.forName("org.apache.beam.sdk.transforms.PeriodicSequence$PeriodicSequenceFn"); + Constructor constructor = fnClass.getDeclaredConstructor(); + constructor.setAccessible(true); + @SuppressWarnings("unchecked") + DoFn fn = + (DoFn) constructor.newInstance(); + + final List backlogs = new ArrayList<>(); + try (ProcessFnTester tester = + new ProcessFnTester<>( + base, + fn, + SerializableCoder.of(SequenceDefinition.class), + SerializableCoder.of(OffsetRange.class), + InstantCoder.of(), + 0, // Force immediate checkpoint after 0 outputs to measure initial backlog + MAX_BUNDLE_DURATION)) { + tester.processFn.setBacklogBytesCallback(backlogs::add); + + // Initial range from PeriodicSequenceFn.getInitialRange + OffsetRange initialRange = + new OffsetRange( + definition.first.getMillis() - definition.durationMilliSec, + definition.last.getMillis()); + + tester.startElement(definition, initialRange); + + // We expect some backlog to be reported. + assertFalse(backlogs.isEmpty()); + assertTrue("Backlog should be positive", backlogs.get(0) > 0); + // 10 seconds in the past + the element at now = 11 elements? + // Actually (now - (base-11s)) / 1s = 11. + // We expect at least 80 bytes (10 elements). + assertThat(backlogs.get(0), greaterThanOrEqualTo(80.0)); + } + } + + @Test + public void testUnboundedCountingSourceBacklog() throws Exception { + Instant base = Instant.now(); + UnboundedSource source = CountingSource.unbounded(); + Read.UnboundedSourceAsSDFWrapperFn fn = + new Read.UnboundedSourceAsSDFWrapperFn<>(source.getCheckpointMarkCoder()); + + final List backlogs = new ArrayList<>(); + try (ProcessFnTester< + UnboundedSource, + ValueWithRecordId, + Read.UnboundedSourceAsSDFWrapperFn.UnboundedSourceRestriction, + Object, + Void> + tester = + new ProcessFnTester<>( + base, + fn, + (Coder) SerializableCoder.of(source.getClass()), + fn.restrictionCoder(), + VoidCoder.of(), + 1, // Force checkpoint after 1 output + MAX_BUNDLE_DURATION)) { + tester.processFn.setBacklogBytesCallback(backlogs::add); + + // Input element needs to be the source itself + Read.UnboundedSourceAsSDFWrapperFn.UnboundedSourceRestriction restriction = + Read.UnboundedSourceAsSDFWrapperFn.UnboundedSourceRestriction.create( + source, null, BoundedWindow.TIMESTAMP_MIN_VALUE); + tester.startElement(source, restriction); + + // We expect backlog to be reported for unbounded too. + // CountingSource.unbounded() reports Long.MAX_VALUE if not started, but here it's started in + // getSize. + // getSize for unbounded source is now implemented in Read.java. + assertFalse("Backlog should have been reported for unbounded", backlogs.isEmpty()); + // CountingSource.unbounded typically reports a large value or based on getSplitBacklogBytes + // which is 8 * elements. + assertThat(backlogs.get(0), greaterThanOrEqualTo(0.0)); + } + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java index 93c288fea9ea..3ad443ee2a2b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java @@ -157,6 +157,7 @@ public DoFnRunner>, OutputT> crea 10000, Duration.standardSeconds(10), stepContext::bundleFinalizer)); + processFn.setBacklogBytesCallback(userStepContext::setBacklogBytes); DoFnRunner>, OutputT> simpleRunner = new SimpleDoFnRunner<>( options, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index f75d452b211b..e1f1b21e135b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -254,6 +254,7 @@ public void start( : WindmillTagEncodingV1.instance(); this.outputBuilder = outputBuilder; this.sideInputCache.clear(); + this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN; clearSinkFullHint(); Instant processingTime = computeProcessingTime(work.getWorkItem().getTimers().getTimersList()); @@ -528,6 +529,11 @@ public Map> flushState() { getWorkItem().getWorkToken(), activeReader); activeReader = null; + } else if (backlogBytes != UnboundedReader.BACKLOG_UNKNOWN && backlogBytes != 1L) { + // If activeReader is null, we might still have backlogBytes from an SDF. We ignore a reported + // backlogBytes of 1 since older versions of the Java SDK use this value as a default when + // RestrictionTracker.getProgress() or GetSize() are not defined. + outputBuilder.setSourceBacklogBytes(backlogBytes); } return callbacks; } @@ -726,6 +732,11 @@ public DataflowStepContext namespacedToUser() { public BundleFinalizer bundleFinalizer() { return wrapped.bundleFinalizer(); } + + @Override + public void setBacklogBytes(double backlogBytes) { + wrapped.setBacklogBytes(backlogBytes); + } } /** A {@link SideInputReader} that fetches side inputs from the streaming worker's cache. */ @@ -856,6 +867,11 @@ public void flushState() { userTimerInternals.persistTo(outputBuilder); } + @Override + public void setBacklogBytes(double backlogBytes) { + StreamingModeExecutionContext.this.backlogBytes = (long) backlogBytes; + } + @Override public TimerData getNextFiredTimer(Coder windowCoder) { if (cachedFiredSystemTimers == null) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 4bfa6efc8880..a1c7609e5af1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -431,4 +431,29 @@ public void testStateTagEncodingBasedOnConfig() { assertEquals(expectedEncoding, executionContext.getWindmillTagEncoding().getClass()); } } + + @Test + public void testSetBacklogBytes() { + Windmill.WorkItemCommitRequest.Builder outputBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + NameContext nameContext = NameContextsForTests.nameContextForTest(); + DataflowOperationContext operationContext = + executionContext.createOperationContext(nameContext); + StreamingModeExecutionContext.StepContext stepContext = + executionContext.getStepContext(operationContext); + + executionContext.start( + "key", + createMockWork( + Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(17L).build(), + Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), + stateReader, + sideInputStateFetcher, + outputBuilder); + + stepContext.setBacklogBytes(1234.0); + executionContext.flushState(); + + assertEquals(1234, outputBuilder.getSourceBacklogBytes()); + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index 43920abb7371..91962807b6db 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -479,7 +479,7 @@ public Progress getProgress() { * maintain any state. */ @UnboundedPerElement - static class UnboundedSourceAsSDFWrapperFn + public static class UnboundedSourceAsSDFWrapperFn extends DoFn, ValueWithRecordId> { private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceAsSDFWrapperFn.class); @@ -493,7 +493,7 @@ static class UnboundedSourceAsSDFWrapperFn> restrictionCoder; @VisibleForTesting - UnboundedSourceAsSDFWrapperFn(Coder checkpointCoder) { + public UnboundedSourceAsSDFWrapperFn(Coder checkpointCoder) { this.checkpointCoder = checkpointCoder; readerCacheSupplier = new MemoizingPerInstantiationSerializableSupplier<>( @@ -605,7 +605,8 @@ public ProcessContinuation processElement( @SuppressWarnings("ReferenceEquality") boolean isInitialRestriction = initialRestriction == currentRestriction; CheckpointT checkpoint = currentRestriction.getCheckpoint(); - if (checkpoint != null + if (bundleFinalizer != null + && checkpoint != null && !isInitialRestriction && !(tracker.currentRestriction().getCheckpoint() instanceof NoopCheckpointMark)) { bundleFinalizer.afterBundleCommit( @@ -614,7 +615,7 @@ public ProcessContinuation processElement( } // If we have been split/checkpoint by a runner, the tracker will have been updated to the - // empty source and we will return stop. Otherwise the unbounded source has only temporarily + // empty source, and we will return stop. Otherwise, the unbounded source has only temporarily // run out of work. if (currentRestriction.getSource() instanceof EmptyUnboundedSource) { return ProcessContinuation.stop(); @@ -626,6 +627,22 @@ public ProcessContinuation processElement( return ProcessContinuation.resume(); } + @GetSize + public double getSize( + @Restriction UnboundedSourceRestriction restriction, + PipelineOptions pipelineOptions) + throws Exception { + try (UnboundedReader reader = + restriction.getSource().createReader(pipelineOptions, restriction.getCheckpoint())) { + reader.start(); + long backlog = reader.getSplitBacklogBytes(); + if (backlog != UnboundedReader.BACKLOG_UNKNOWN) { + return (double) backlog; + } + } + return -1.0; + } + @GetInitialWatermarkEstimatorState public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) { return currentElementTimestamp; @@ -661,7 +678,7 @@ public Coder> restrictionCoder( * splittable DoFn for each output element. */ @AutoValue - abstract static class UnboundedSourceValue { + public abstract static class UnboundedSourceValue { public static UnboundedSourceValue create( byte[] id, T value, Instant timestamp, Instant watermark) { @@ -698,7 +715,8 @@ public static CacheState create( * future {@link org.apache.beam.sdk.transforms.DoFn.ProcessElement @ProcessElement} calls. */ @AutoValue - abstract static class UnboundedSourceRestriction + public abstract static class UnboundedSourceRestriction< + OutputT, CheckpointT extends CheckpointMark> implements Serializable { @SuppressWarnings("nullness") // https://github.com/google/auto/issues/1320 public static diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java index 096da842b820..b0bb34425dd0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java @@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import java.io.Serializable; import java.util.Objects; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.range.OffsetRange; @@ -54,7 +55,7 @@ public class PeriodicSequence extends PTransform, PCollection> { @DefaultSchema(JavaFieldSchema.class) - public static class SequenceDefinition { + public static class SequenceDefinition implements Serializable { public Instant first; public Instant last; public Long durationMilliSec; From 71ffc9a4c20a732126810f0494629d3052b6a78c Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Fri, 1 May 2026 10:43:44 -0700 Subject: [PATCH 2/4] Adds call to newTracker invoker for when GetSize is not defined on an SDF. --- ...oundedSplittableProcessElementInvoker.java | 69 +++++++-- ...edSplittableProcessElementInvokerTest.java | 32 +++-- .../core/SplittableParDoProcessFnTest.java | 131 ++++++------------ .../java/org/apache/beam/sdk/io/Read.java | 30 +--- .../beam/sdk/transforms/PeriodicSequence.java | 3 +- 5 files changed, 123 insertions(+), 142 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 00e19d8db9b6..23c234c2d965 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -279,6 +279,7 @@ public FinishBundleContext finishBundleContext(DoFn doFn) { return new Result(null, cont, null, null); } final KV> residualForGetSize = residual; + // For a list of all DoFnInvoker arguments, see DoFn.java. double backlogBytes = invoker.invokeGetSize( new DoFnInvoker.BaseArgumentProvider() { @@ -289,13 +290,13 @@ public String getErrorContext() { } @Override - public Object restriction() { - return residualForGetSize.getKey(); + public InputT element(DoFn doFn) { + return element.getValue(); } @Override - public InputT element(DoFn doFn) { - return element.getValue(); + public Object restriction() { + return residualForGetSize.getKey(); } @Override @@ -305,26 +306,64 @@ public Instant timestamp(DoFn doFn) { @Override public RestrictionTracker restrictionTracker() { - return tracker; + return invoker.invokeNewTracker( + new DoFnInvoker.BaseArgumentProvider() { + @Override + public String getErrorContext() { + return OutputAndTimeBoundedSplittableProcessElementInvoker.class + .getSimpleName() + + "/NewTracker"; + } + + @Override + public InputT element(DoFn doFn) { + return element.getValue(); + } + + @Override + public Object restriction() { + return residualForGetSize.getKey(); + } + + @Override + public Instant timestamp(DoFn doFn) { + return element.getTimestamp(); + } + + @Override + public BoundedWindow window() { + throw new IllegalStateException( + "Attempting to access window outside of a windowed context"); + } + + @Override + public PaneInfo paneInfo(DoFn doFn) { + throw new IllegalStateException( + "Attempting to access PaneInfo outside of a windowed context"); + } + + @Override + public PipelineOptions pipelineOptions() { + return pipelineOptions; + } + }); } @Override - public WatermarkEstimator watermarkEstimator() { - return watermarkEstimator; + public BoundedWindow window() { + throw new IllegalStateException( + "Attempting to access window outside of a windowed context"); } @Override - public PipelineOptions pipelineOptions() { - return pipelineOptions; + public PaneInfo paneInfo(DoFn doFn) { + throw new IllegalStateException( + "Attempting to access PaneInfo outside of a windowed context"); } @Override - public Object sideInput(String tagId) { - PCollectionView view = sideInputMapping.get(tagId); - if (view == null) { - throw new IllegalArgumentException("calling getSideInput() with unknown view"); - } - return processContext.sideInput(view); + public PipelineOptions pipelineOptions() { + return pipelineOptions; } }); return new Result( diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java index f6295fa3463a..52ac6b1a8199 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java @@ -97,9 +97,15 @@ public OffsetRange getInitialRestriction(@SuppressWarnings("unused") @Element Vo private static class GetSizeFn extends DoFn { @ProcessElement - public ProcessContinuation process(RestrictionTracker tracker) { - assertTrue(tracker.tryClaim(tracker.currentRestriction().getFrom())); - return resume(); + public ProcessContinuation process( + ProcessContext c, RestrictionTracker tracker) { + for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) { + c.output(String.valueOf(i)); + if (i == 2) { + return resume(); + } + } + return stop(); } @GetInitialRestriction @@ -121,11 +127,12 @@ private SplittableProcessElementInvoker.R throws Exception { SomeFn fn = new SomeFn(sleepBeforeFirstClaim, numOutputsPerProcessCall, sleepBeforeEachOutput); OffsetRange initialRestriction = new OffsetRange(0, totalNumOutputs); - return runTest(fn, initialRestriction); + return runTest(fn, initialRestriction, Duration.standardSeconds(3)); } private SplittableProcessElementInvoker.Result runTest( - DoFn fn, OffsetRange initialRestriction) throws Exception { + DoFn fn, OffsetRange initialRestriction, Duration checkpointDuration) + throws Exception { SplittableProcessElementInvoker invoker = new OutputAndTimeBoundedSplittableProcessElementInvoker<>( fn, @@ -140,7 +147,7 @@ public void output(TupleTag tag, WindowedValue outpu NullSideInputReader.empty(), Executors.newSingleThreadScheduledExecutor(), 1000, - Duration.standardSeconds(3), + checkpointDuration, () -> { throw new UnsupportedOperationException("BundleFinalizer not configured for test."); }); @@ -233,7 +240,7 @@ public OffsetRange getInitialRestriction( } }; e.expectMessage("Output is not allowed before tryClaim()"); - runTest(brokenFn, new OffsetRange(0, 5)); + runTest(brokenFn, new OffsetRange(0, 5), Duration.standardSeconds(3)); } @Test @@ -253,17 +260,18 @@ public OffsetRange getInitialRestriction( } }; e.expectMessage("Output is not allowed after a failed tryClaim()"); - runTest(brokenFn, new OffsetRange(0, 5)); + runTest(brokenFn, new OffsetRange(0, 5), Duration.standardSeconds(3)); } @Test public void testBacklogBytes() throws Exception { GetSizeFn fn = new GetSizeFn(); OffsetRange initialRestriction = new OffsetRange(0, 10); + // Set a high checkpoint duration to prevent flakiness caused by early checkpointing. SplittableProcessElementInvoker.Result res = - runTest(fn, initialRestriction); - // GetSizeFn only claims 1 element and then takes a checkpoint. - assertEquals(9.0, res.getBacklogBytes(), 0.001); - assertEquals(new OffsetRange(1, 10), res.getResidualRestriction()); + runTest(fn, initialRestriction, Duration.standardMinutes(3)); + // GetSizeFn claims 3 elements and then takes a checkpoint. + assertEquals(7.0, res.getBacklogBytes(), 0.001); + assertEquals(new OffsetRange(3, 10), res.getResidualRestriction()); } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java index 35444003195e..381e41c98705 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java @@ -31,7 +31,6 @@ import static org.junit.Assert.assertTrue; import java.io.Serializable; -import java.lang.reflect.Constructor; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -45,16 +44,12 @@ import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.CountingSource; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.testing.ResetDateTimeProvider; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; -import org.apache.beam.sdk.transforms.PeriodicSequence.SequenceDefinition; import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker; import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; @@ -73,7 +68,6 @@ import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.ValueInSingleWindow; -import org.apache.beam.sdk.values.ValueWithRecordId; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.sdk.values.WindowingStrategy; @@ -399,7 +393,7 @@ public ProcessContinuation process( ProcessContext c, RestrictionTracker tracker) { for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) { c.output(String.valueOf(i)); - if (i == 1) { + if (i == 2) { return resume(); } } @@ -422,6 +416,32 @@ public double getSize(@Restriction OffsetRange range) { } } + // Used to check that backlog can be computed from the restriction tracker if GetSize is not + // defined. + private static class SdfWithoutGetSize extends DoFn { + @ProcessElement + public ProcessContinuation process( + ProcessContext c, RestrictionTracker tracker) { + for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) { + c.output(String.valueOf(i)); + if (i == 2) { + return resume(); + } + } + return stop(); + } + + @GetInitialRestriction + public OffsetRange getInitialRestriction() { + return new OffsetRange(0, 10); + } + + @NewTracker + public OffsetRangeTracker newTracker(@Restriction OffsetRange range) { + return new OffsetRangeTracker(range); + } + } + @Test public void testDrains() throws Exception { DoFn fn = new WatermarkUpdateFn(); @@ -739,102 +759,35 @@ public void testReportsBacklog() throws Exception { tester.processFn.setBacklogBytesCallback(backlogs::add); tester.startElement(42, new OffsetRange(0, 10)); - // First call outputs 0, 1 and then resumes. - // The residual range should be [2, 10), so size is 8. + // First call outputs 0, 1, and 2, and then resumes. + // The residual range should be [3, 10), so size is 7. assertEquals(1, backlogs.size()); - assertEquals(8.0, backlogs.get(0), 0.001); + assertEquals(7.0, backlogs.get(0), 0.001); } } @Test - public void testPeriodicSequenceBacklog() throws Exception { + public void testReportsBacklogWithoutGetSize() throws Exception { + DoFn fn = new SdfWithoutGetSize(); Instant base = Instant.now(); - // Start 10 seconds ago, end 10 seconds from now, interval 1 second. - // PeriodicSequenceFn should have some backlog. - SequenceDefinition definition = - new SequenceDefinition( - base.minus(Duration.standardSeconds(10)), - base.plus(Duration.standardSeconds(10)), - Duration.standardSeconds(1)); - - // PeriodicSequenceFn is private, so we use reflection to instantiate it. - Class fnClass = - Class.forName("org.apache.beam.sdk.transforms.PeriodicSequence$PeriodicSequenceFn"); - Constructor constructor = fnClass.getDeclaredConstructor(); - constructor.setAccessible(true); - @SuppressWarnings("unchecked") - DoFn fn = - (DoFn) constructor.newInstance(); - final List backlogs = new ArrayList<>(); - try (ProcessFnTester tester = + + try (ProcessFnTester tester = new ProcessFnTester<>( base, fn, - SerializableCoder.of(SequenceDefinition.class), + BigEndianIntegerCoder.of(), SerializableCoder.of(OffsetRange.class), - InstantCoder.of(), - 0, // Force immediate checkpoint after 0 outputs to measure initial backlog + VoidCoder.of(), + MAX_OUTPUTS_PER_BUNDLE, MAX_BUNDLE_DURATION)) { tester.processFn.setBacklogBytesCallback(backlogs::add); - // Initial range from PeriodicSequenceFn.getInitialRange - OffsetRange initialRange = - new OffsetRange( - definition.first.getMillis() - definition.durationMilliSec, - definition.last.getMillis()); - - tester.startElement(definition, initialRange); - - // We expect some backlog to be reported. - assertFalse(backlogs.isEmpty()); - assertTrue("Backlog should be positive", backlogs.get(0) > 0); - // 10 seconds in the past + the element at now = 11 elements? - // Actually (now - (base-11s)) / 1s = 11. - // We expect at least 80 bytes (10 elements). - assertThat(backlogs.get(0), greaterThanOrEqualTo(80.0)); - } - } - - @Test - public void testUnboundedCountingSourceBacklog() throws Exception { - Instant base = Instant.now(); - UnboundedSource source = CountingSource.unbounded(); - Read.UnboundedSourceAsSDFWrapperFn fn = - new Read.UnboundedSourceAsSDFWrapperFn<>(source.getCheckpointMarkCoder()); - - final List backlogs = new ArrayList<>(); - try (ProcessFnTester< - UnboundedSource, - ValueWithRecordId, - Read.UnboundedSourceAsSDFWrapperFn.UnboundedSourceRestriction, - Object, - Void> - tester = - new ProcessFnTester<>( - base, - fn, - (Coder) SerializableCoder.of(source.getClass()), - fn.restrictionCoder(), - VoidCoder.of(), - 1, // Force checkpoint after 1 output - MAX_BUNDLE_DURATION)) { - tester.processFn.setBacklogBytesCallback(backlogs::add); - - // Input element needs to be the source itself - Read.UnboundedSourceAsSDFWrapperFn.UnboundedSourceRestriction restriction = - Read.UnboundedSourceAsSDFWrapperFn.UnboundedSourceRestriction.create( - source, null, BoundedWindow.TIMESTAMP_MIN_VALUE); - tester.startElement(source, restriction); - - // We expect backlog to be reported for unbounded too. - // CountingSource.unbounded() reports Long.MAX_VALUE if not started, but here it's started in - // getSize. - // getSize for unbounded source is now implemented in Read.java. - assertFalse("Backlog should have been reported for unbounded", backlogs.isEmpty()); - // CountingSource.unbounded typically reports a large value or based on getSplitBacklogBytes - // which is 8 * elements. - assertThat(backlogs.get(0), greaterThanOrEqualTo(0.0)); + tester.startElement(42, new OffsetRange(0, 10)); + // First call outputs 0, 1, and 2, and then resumes. + // The residual range should be [3, 10), so size is 7. + assertEquals(1, backlogs.size()); + assertEquals(7.0, backlogs.get(0), 0.001); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index 91962807b6db..43920abb7371 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -479,7 +479,7 @@ public Progress getProgress() { * maintain any state. */ @UnboundedPerElement - public static class UnboundedSourceAsSDFWrapperFn + static class UnboundedSourceAsSDFWrapperFn extends DoFn, ValueWithRecordId> { private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceAsSDFWrapperFn.class); @@ -493,7 +493,7 @@ public static class UnboundedSourceAsSDFWrapperFn> restrictionCoder; @VisibleForTesting - public UnboundedSourceAsSDFWrapperFn(Coder checkpointCoder) { + UnboundedSourceAsSDFWrapperFn(Coder checkpointCoder) { this.checkpointCoder = checkpointCoder; readerCacheSupplier = new MemoizingPerInstantiationSerializableSupplier<>( @@ -605,8 +605,7 @@ public ProcessContinuation processElement( @SuppressWarnings("ReferenceEquality") boolean isInitialRestriction = initialRestriction == currentRestriction; CheckpointT checkpoint = currentRestriction.getCheckpoint(); - if (bundleFinalizer != null - && checkpoint != null + if (checkpoint != null && !isInitialRestriction && !(tracker.currentRestriction().getCheckpoint() instanceof NoopCheckpointMark)) { bundleFinalizer.afterBundleCommit( @@ -615,7 +614,7 @@ public ProcessContinuation processElement( } // If we have been split/checkpoint by a runner, the tracker will have been updated to the - // empty source, and we will return stop. Otherwise, the unbounded source has only temporarily + // empty source and we will return stop. Otherwise the unbounded source has only temporarily // run out of work. if (currentRestriction.getSource() instanceof EmptyUnboundedSource) { return ProcessContinuation.stop(); @@ -627,22 +626,6 @@ public ProcessContinuation processElement( return ProcessContinuation.resume(); } - @GetSize - public double getSize( - @Restriction UnboundedSourceRestriction restriction, - PipelineOptions pipelineOptions) - throws Exception { - try (UnboundedReader reader = - restriction.getSource().createReader(pipelineOptions, restriction.getCheckpoint())) { - reader.start(); - long backlog = reader.getSplitBacklogBytes(); - if (backlog != UnboundedReader.BACKLOG_UNKNOWN) { - return (double) backlog; - } - } - return -1.0; - } - @GetInitialWatermarkEstimatorState public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) { return currentElementTimestamp; @@ -678,7 +661,7 @@ public Coder> restrictionCoder( * splittable DoFn for each output element. */ @AutoValue - public abstract static class UnboundedSourceValue { + abstract static class UnboundedSourceValue { public static UnboundedSourceValue create( byte[] id, T value, Instant timestamp, Instant watermark) { @@ -715,8 +698,7 @@ public static CacheState create( * future {@link org.apache.beam.sdk.transforms.DoFn.ProcessElement @ProcessElement} calls. */ @AutoValue - public abstract static class UnboundedSourceRestriction< - OutputT, CheckpointT extends CheckpointMark> + abstract static class UnboundedSourceRestriction implements Serializable { @SuppressWarnings("nullness") // https://github.com/google/auto/issues/1320 public static diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java index b0bb34425dd0..096da842b820 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PeriodicSequence.java @@ -21,7 +21,6 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; -import java.io.Serializable; import java.util.Objects; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.io.range.OffsetRange; @@ -55,7 +54,7 @@ public class PeriodicSequence extends PTransform, PCollection> { @DefaultSchema(JavaFieldSchema.class) - public static class SequenceDefinition implements Serializable { + public static class SequenceDefinition { public Instant first; public Instant last; public Long durationMilliSec; From 2f0693834ce08e5bffcc49b91c89f673cb51a4a2 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Fri, 1 May 2026 13:16:01 -0700 Subject: [PATCH 3/4] Trigger validates runner tests --- ...stCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json index 50d17c108f2e..e623d3373a93 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 2, + "modification": 1, } From 43222da15ba1e7a424699897b6522b577b3ea2c8 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Fri, 1 May 2026 14:28:41 -0700 Subject: [PATCH 4/4] Adding side input to GetSize arguments. It is not technically part of the API in DoFn.java, but some tests are using it. --- ...putAndTimeBoundedSplittableProcessElementInvoker.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 23c234c2d965..cb3225ba808e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -365,6 +365,15 @@ public PaneInfo paneInfo(DoFn doFn) { public PipelineOptions pipelineOptions() { return pipelineOptions; } + + @Override + public Object sideInput(String tagId) { + PCollectionView view = sideInputMapping.get(tagId); + if (view == null) { + throw new IllegalArgumentException("calling getSideInput() with unknown view"); + } + return processContext.sideInput(view); + } }); return new Result( residual.getKey(),