diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json index 50d17c108f2e..e623d3373a93 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming_Engine.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 2, + "modification": 1, } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 07e4756885dd..cb3225ba808e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -278,8 +278,109 @@ public FinishBundleContext finishBundleContext(DoFn doFn) { if (residual == null) { return new Result(null, cont, null, null); } + final KV> residualForGetSize = residual; + // For a list of all DoFnInvoker arguments, see DoFn.java. + double backlogBytes = + invoker.invokeGetSize( + new DoFnInvoker.BaseArgumentProvider() { + @Override + public String getErrorContext() { + return OutputAndTimeBoundedSplittableProcessElementInvoker.class.getSimpleName() + + "/GetSize"; + } + + @Override + public InputT element(DoFn doFn) { + return element.getValue(); + } + + @Override + public Object restriction() { + return residualForGetSize.getKey(); + } + + @Override + public Instant timestamp(DoFn doFn) { + return element.getTimestamp(); + } + + @Override + public RestrictionTracker restrictionTracker() { + return invoker.invokeNewTracker( + new DoFnInvoker.BaseArgumentProvider() { + @Override + public String getErrorContext() { + return OutputAndTimeBoundedSplittableProcessElementInvoker.class + .getSimpleName() + + "/NewTracker"; + } + + @Override + public InputT element(DoFn doFn) { + return element.getValue(); + } + + @Override + public Object restriction() { + return residualForGetSize.getKey(); + } + + @Override + public Instant timestamp(DoFn doFn) { + return element.getTimestamp(); + } + + @Override + public BoundedWindow window() { + throw new IllegalStateException( + "Attempting to access window outside of a windowed context"); + } + + @Override + public PaneInfo paneInfo(DoFn doFn) { + throw new IllegalStateException( + "Attempting to access PaneInfo outside of a windowed context"); + } + + @Override + public PipelineOptions pipelineOptions() { + return pipelineOptions; + } + }); + } + + @Override + public BoundedWindow window() { + throw new IllegalStateException( + "Attempting to access window outside of a windowed context"); + } + + @Override + public PaneInfo paneInfo(DoFn doFn) { + throw new IllegalStateException( + "Attempting to access PaneInfo outside of a windowed context"); + } + + @Override + public PipelineOptions pipelineOptions() { + return pipelineOptions; + } + + @Override + public Object sideInput(String tagId) { + PCollectionView view = sideInputMapping.get(tagId); + if (view == null) { + throw new IllegalArgumentException("calling getSideInput() with unknown view"); + } + return processContext.sideInput(view); + } + }); return new Result( - residual.getKey(), cont, residual.getValue().getKey(), residual.getValue().getValue()); + residual.getKey(), + cont, + residual.getValue().getKey(), + residual.getValue().getValue(), + backlogBytes); } private class ProcessContext extends DoFn.ProcessContext diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index 3519a74aada7..39125b2d1ba4 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -22,6 +22,7 @@ import com.google.auto.service.AutoService; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; @@ -281,6 +282,7 @@ public static class ProcessFn invoker; + private transient @Nullable Consumer backlogBytesCallback; public ProcessFn( DoFn fn, @@ -323,6 +325,10 @@ public void setProcessElementInvoker( this.processElementInvoker = invoker; } + public void setBacklogBytesCallback(Consumer backlogBytesCallback) { + this.backlogBytesCallback = backlogBytesCallback; + } + public DoFn getFn() { return fn; } @@ -622,6 +628,9 @@ public String getErrorContext() { } else { holdState.clear(); } + if (backlogBytesCallback != null && result.getBacklogBytes() >= 0) { + backlogBytesCallback.accept(result.getBacklogBytes()); + } } private DoFnInvoker.ArgumentProvider wrapOptionsAsSetup( diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java index 1ff66d6e517c..52c9f6712b53 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java @@ -42,6 +42,7 @@ public class Result { private final DoFn.ProcessContinuation continuation; private final @Nullable Instant futureOutputWatermark; private final @Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState; + private final double backlogBytes; @SuppressFBWarnings( value = "NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE", @@ -50,12 +51,27 @@ public Result( @Nullable RestrictionT residualRestriction, DoFn.ProcessContinuation continuation, @Nullable Instant futureOutputWatermark, - @Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState) { + @Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState, + double backlogBytes) { checkArgument(continuation != null, "continuation must not be null"); this.continuation = continuation; this.residualRestriction = residualRestriction; this.futureOutputWatermark = futureOutputWatermark; this.futureWatermarkEstimatorState = futureWatermarkEstimatorState; + this.backlogBytes = backlogBytes; + } + + public Result( + @Nullable RestrictionT residualRestriction, + DoFn.ProcessContinuation continuation, + @Nullable Instant futureOutputWatermark, + @Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState) { + this( + residualRestriction, + continuation, + futureOutputWatermark, + futureWatermarkEstimatorState, + -1.0); } /** @@ -76,6 +92,10 @@ public DoFn.ProcessContinuation getContinuation() { public @Nullable WatermarkEstimatorStateT getFutureWatermarkEstimatorState() { return futureWatermarkEstimatorState; } + + public double getBacklogBytes() { + return backlogBytes; + } } /** diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java index d2a03ff6ab39..e07d1a105862 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StepContext.java @@ -36,4 +36,10 @@ public interface StepContext { default BundleFinalizer bundleFinalizer() { throw new UnsupportedOperationException("BundleFinalizer is unsupported."); } + + /** + * Set the current backlog bytes for this step. This is mainly used by splittable DoFn to report + * the size of the residual restriction. + */ + default void setBacklogBytes(double backlogBytes) {} } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java index 1750cceffa0e..52ac6b1a8199 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java @@ -95,6 +95,30 @@ public OffsetRange getInitialRestriction(@SuppressWarnings("unused") @Element Vo } } + private static class GetSizeFn extends DoFn { + @ProcessElement + public ProcessContinuation process( + ProcessContext c, RestrictionTracker tracker) { + for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) { + c.output(String.valueOf(i)); + if (i == 2) { + return resume(); + } + } + return stop(); + } + + @GetInitialRestriction + public OffsetRange getInitialRestriction() { + return new OffsetRange(0, 10); + } + + @GetSize + public double getSize(@Restriction OffsetRange range) { + return range.getTo() - range.getFrom(); + } + } + private SplittableProcessElementInvoker.Result runTest( int totalNumOutputs, Duration sleepBeforeFirstClaim, @@ -103,11 +127,12 @@ private SplittableProcessElementInvoker.R throws Exception { SomeFn fn = new SomeFn(sleepBeforeFirstClaim, numOutputsPerProcessCall, sleepBeforeEachOutput); OffsetRange initialRestriction = new OffsetRange(0, totalNumOutputs); - return runTest(fn, initialRestriction); + return runTest(fn, initialRestriction, Duration.standardSeconds(3)); } private SplittableProcessElementInvoker.Result runTest( - DoFn fn, OffsetRange initialRestriction) throws Exception { + DoFn fn, OffsetRange initialRestriction, Duration checkpointDuration) + throws Exception { SplittableProcessElementInvoker invoker = new OutputAndTimeBoundedSplittableProcessElementInvoker<>( fn, @@ -122,7 +147,7 @@ public void output(TupleTag tag, WindowedValue outpu NullSideInputReader.empty(), Executors.newSingleThreadScheduledExecutor(), 1000, - Duration.standardSeconds(3), + checkpointDuration, () -> { throw new UnsupportedOperationException("BundleFinalizer not configured for test."); }); @@ -215,7 +240,7 @@ public OffsetRange getInitialRestriction( } }; e.expectMessage("Output is not allowed before tryClaim()"); - runTest(brokenFn, new OffsetRange(0, 5)); + runTest(brokenFn, new OffsetRange(0, 5), Duration.standardSeconds(3)); } @Test @@ -235,6 +260,18 @@ public OffsetRange getInitialRestriction( } }; e.expectMessage("Output is not allowed after a failed tryClaim()"); - runTest(brokenFn, new OffsetRange(0, 5)); + runTest(brokenFn, new OffsetRange(0, 5), Duration.standardSeconds(3)); + } + + @Test + public void testBacklogBytes() throws Exception { + GetSizeFn fn = new GetSizeFn(); + OffsetRange initialRestriction = new OffsetRange(0, 10); + // Set a high checkpoint duration to prevent flakiness caused by early checkpointing. + SplittableProcessElementInvoker.Result res = + runTest(fn, initialRestriction, Duration.standardMinutes(3)); + // GetSizeFn claims 3 elements and then takes a checkpoint. + assertEquals(7.0, res.getBacklogBytes(), 0.001); + assertEquals(new OffsetRange(3, 10), res.getResidualRestriction()); } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java index ef1f201ca1ee..381e41c98705 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java @@ -140,6 +140,8 @@ private static class ProcessFnTester< private InMemoryTimerInternals timerInternals; private TestInMemoryStateInternals stateInternals; private InMemoryBundleFinalizer bundleFinalizer; + private final ProcessFn + processFn; ProcessFnTester( Instant currentProcessingTime, @@ -154,15 +156,14 @@ private static class ProcessFnTester< // encode IntervalWindow's because that's what all tests here use. WindowingStrategy windowingStrategy = (WindowingStrategy) WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(1))); - final ProcessFn - processFn = - new ProcessFn<>( - fn, - inputCoder, - restrictionCoder, - watermarkEstimatorStateCoder, - windowingStrategy, - Collections.emptyMap()); + this.processFn = + new ProcessFn<>( + fn, + inputCoder, + restrictionCoder, + watermarkEstimatorStateCoder, + windowingStrategy, + Collections.emptyMap()); this.tester = DoFnTester.of(processFn); this.timerInternals = new InMemoryTimerInternals(); this.stateInternals = new TestInMemoryStateInternals<>("dummy"); @@ -386,6 +387,61 @@ public WatermarkEstimators.Manual newWatermarkEstimator( } } + private static class GetSizeFn extends DoFn { + @ProcessElement + public ProcessContinuation process( + ProcessContext c, RestrictionTracker tracker) { + for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) { + c.output(String.valueOf(i)); + if (i == 2) { + return resume(); + } + } + return stop(); + } + + @GetInitialRestriction + public OffsetRange getInitialRestriction() { + return new OffsetRange(0, 10); + } + + @NewTracker + public OffsetRangeTracker newTracker(@Restriction OffsetRange range) { + return new OffsetRangeTracker(range); + } + + @GetSize + public double getSize(@Restriction OffsetRange range) { + return range.getTo() - range.getFrom(); + } + } + + // Used to check that backlog can be computed from the restriction tracker if GetSize is not + // defined. + private static class SdfWithoutGetSize extends DoFn { + @ProcessElement + public ProcessContinuation process( + ProcessContext c, RestrictionTracker tracker) { + for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) { + c.output(String.valueOf(i)); + if (i == 2) { + return resume(); + } + } + return stop(); + } + + @GetInitialRestriction + public OffsetRange getInitialRestriction() { + return new OffsetRange(0, 10); + } + + @NewTracker + public OffsetRangeTracker newTracker(@Restriction OffsetRange range) { + return new OffsetRangeTracker(range); + } + } + @Test public void testDrains() throws Exception { DoFn fn = new WatermarkUpdateFn(); @@ -684,4 +740,54 @@ public void testInvokesLifecycleMethods() throws Exception { tester.startElement(42, new SomeRestriction()); } } + + @Test + public void testReportsBacklog() throws Exception { + DoFn fn = new GetSizeFn(); + Instant base = Instant.now(); + final List backlogs = new ArrayList<>(); + + try (ProcessFnTester tester = + new ProcessFnTester<>( + base, + fn, + BigEndianIntegerCoder.of(), + SerializableCoder.of(OffsetRange.class), + VoidCoder.of(), + MAX_OUTPUTS_PER_BUNDLE, + MAX_BUNDLE_DURATION)) { + tester.processFn.setBacklogBytesCallback(backlogs::add); + + tester.startElement(42, new OffsetRange(0, 10)); + // First call outputs 0, 1, and 2, and then resumes. + // The residual range should be [3, 10), so size is 7. + assertEquals(1, backlogs.size()); + assertEquals(7.0, backlogs.get(0), 0.001); + } + } + + @Test + public void testReportsBacklogWithoutGetSize() throws Exception { + DoFn fn = new SdfWithoutGetSize(); + Instant base = Instant.now(); + final List backlogs = new ArrayList<>(); + + try (ProcessFnTester tester = + new ProcessFnTester<>( + base, + fn, + BigEndianIntegerCoder.of(), + SerializableCoder.of(OffsetRange.class), + VoidCoder.of(), + MAX_OUTPUTS_PER_BUNDLE, + MAX_BUNDLE_DURATION)) { + tester.processFn.setBacklogBytesCallback(backlogs::add); + + tester.startElement(42, new OffsetRange(0, 10)); + // First call outputs 0, 1, and 2, and then resumes. + // The residual range should be [3, 10), so size is 7. + assertEquals(1, backlogs.size()); + assertEquals(7.0, backlogs.get(0), 0.001); + } + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java index 93c288fea9ea..3ad443ee2a2b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java @@ -157,6 +157,7 @@ public DoFnRunner>, OutputT> crea 10000, Duration.standardSeconds(10), stepContext::bundleFinalizer)); + processFn.setBacklogBytesCallback(userStepContext::setBacklogBytes); DoFnRunner>, OutputT> simpleRunner = new SimpleDoFnRunner<>( options, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index f75d452b211b..e1f1b21e135b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -254,6 +254,7 @@ public void start( : WindmillTagEncodingV1.instance(); this.outputBuilder = outputBuilder; this.sideInputCache.clear(); + this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN; clearSinkFullHint(); Instant processingTime = computeProcessingTime(work.getWorkItem().getTimers().getTimersList()); @@ -528,6 +529,11 @@ public Map> flushState() { getWorkItem().getWorkToken(), activeReader); activeReader = null; + } else if (backlogBytes != UnboundedReader.BACKLOG_UNKNOWN && backlogBytes != 1L) { + // If activeReader is null, we might still have backlogBytes from an SDF. We ignore a reported + // backlogBytes of 1 since older versions of the Java SDK use this value as a default when + // RestrictionTracker.getProgress() or GetSize() are not defined. + outputBuilder.setSourceBacklogBytes(backlogBytes); } return callbacks; } @@ -726,6 +732,11 @@ public DataflowStepContext namespacedToUser() { public BundleFinalizer bundleFinalizer() { return wrapped.bundleFinalizer(); } + + @Override + public void setBacklogBytes(double backlogBytes) { + wrapped.setBacklogBytes(backlogBytes); + } } /** A {@link SideInputReader} that fetches side inputs from the streaming worker's cache. */ @@ -856,6 +867,11 @@ public void flushState() { userTimerInternals.persistTo(outputBuilder); } + @Override + public void setBacklogBytes(double backlogBytes) { + StreamingModeExecutionContext.this.backlogBytes = (long) backlogBytes; + } + @Override public TimerData getNextFiredTimer(Coder windowCoder) { if (cachedFiredSystemTimers == null) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 4bfa6efc8880..a1c7609e5af1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -431,4 +431,29 @@ public void testStateTagEncodingBasedOnConfig() { assertEquals(expectedEncoding, executionContext.getWindmillTagEncoding().getClass()); } } + + @Test + public void testSetBacklogBytes() { + Windmill.WorkItemCommitRequest.Builder outputBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + NameContext nameContext = NameContextsForTests.nameContextForTest(); + DataflowOperationContext operationContext = + executionContext.createOperationContext(nameContext); + StreamingModeExecutionContext.StepContext stepContext = + executionContext.getStepContext(operationContext); + + executionContext.start( + "key", + createMockWork( + Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(17L).build(), + Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), + stateReader, + sideInputStateFetcher, + outputBuilder); + + stepContext.setBacklogBytes(1234.0); + executionContext.flushState(); + + assertEquals(1234, outputBuilder.getSourceBacklogBytes()); + } }