Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,61 @@ public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
if (residual == null) {
return new Result(null, cont, null, null);
}
final KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>> residualForGetSize = residual;
double backlogBytes =
invoker.invokeGetSize(
new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
@Override
public String getErrorContext() {
return OutputAndTimeBoundedSplittableProcessElementInvoker.class.getSimpleName()
+ "/GetSize";
}

@Override
public Object restriction() {
return residualForGetSize.getKey();
}

@Override
public InputT element(DoFn<InputT, OutputT> doFn) {
return element.getValue();
}

@Override
public Instant timestamp(DoFn<InputT, OutputT> doFn) {
return element.getTimestamp();
}

@Override
public RestrictionTracker<?, ?> restrictionTracker() {
return tracker;
}

@Override
public WatermarkEstimator<?> watermarkEstimator() {
return watermarkEstimator;
}

@Override
public PipelineOptions pipelineOptions() {
return pipelineOptions;
}

@Override
public Object sideInput(String tagId) {
PCollectionView<?> view = sideInputMapping.get(tagId);
if (view == null) {
throw new IllegalArgumentException("calling getSideInput() with unknown view");
}
return processContext.sideInput(view);
}
});
return new Result(
residual.getKey(), cont, residual.getValue().getKey(), residual.getValue().getValue());
residual.getKey(),
cont,
residual.getValue().getKey(),
residual.getValue().getValue(),
backlogBytes);
}

private class ProcessContext extends DoFn<InputT, OutputT>.ProcessContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.auto.service.AutoService;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
Expand Down Expand Up @@ -281,6 +282,7 @@ public static class ProcessFn<InputT, OutputT, RestrictionT, PositionT, Watermar
processElementInvoker;

private transient @Nullable DoFnInvoker<InputT, OutputT> invoker;
private transient @Nullable Consumer<Double> backlogBytesCallback;

public ProcessFn(
DoFn<InputT, OutputT> fn,
Expand Down Expand Up @@ -323,6 +325,10 @@ public void setProcessElementInvoker(
this.processElementInvoker = invoker;
}

public void setBacklogBytesCallback(Consumer<Double> backlogBytesCallback) {
this.backlogBytesCallback = backlogBytesCallback;
}

public DoFn<InputT, OutputT> getFn() {
return fn;
}
Expand Down Expand Up @@ -622,6 +628,9 @@ public String getErrorContext() {
} else {
holdState.clear();
}
if (backlogBytesCallback != null && result.getBacklogBytes() >= 0) {
backlogBytesCallback.accept(result.getBacklogBytes());
}
}

private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapOptionsAsSetup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class Result {
private final DoFn.ProcessContinuation continuation;
private final @Nullable Instant futureOutputWatermark;
private final @Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState;
private final double backlogBytes;

@SuppressFBWarnings(
value = "NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE",
Expand All @@ -50,12 +51,27 @@ public Result(
@Nullable RestrictionT residualRestriction,
DoFn.ProcessContinuation continuation,
@Nullable Instant futureOutputWatermark,
@Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState) {
@Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState,
double backlogBytes) {
checkArgument(continuation != null, "continuation must not be null");
this.continuation = continuation;
this.residualRestriction = residualRestriction;
this.futureOutputWatermark = futureOutputWatermark;
this.futureWatermarkEstimatorState = futureWatermarkEstimatorState;
this.backlogBytes = backlogBytes;
}

public Result(
@Nullable RestrictionT residualRestriction,
DoFn.ProcessContinuation continuation,
@Nullable Instant futureOutputWatermark,
@Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState) {
this(
residualRestriction,
continuation,
futureOutputWatermark,
futureWatermarkEstimatorState,
-1.0);
}

/**
Expand All @@ -76,6 +92,10 @@ public DoFn.ProcessContinuation getContinuation() {
public @Nullable WatermarkEstimatorStateT getFutureWatermarkEstimatorState() {
return futureWatermarkEstimatorState;
}

public double getBacklogBytes() {
return backlogBytes;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,10 @@ public interface StepContext {
default BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException("BundleFinalizer is unsupported.");
}

/**
* Set the current backlog bytes for this step. This is mainly used by splittable DoFn to report
* the size of the residual restriction.
*/
default void setBacklogBytes(double backlogBytes) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,24 @@ public OffsetRange getInitialRestriction(@SuppressWarnings("unused") @Element Vo
}
}

private static class GetSizeFn extends DoFn<Void, String> {
@ProcessElement
public ProcessContinuation process(RestrictionTracker<OffsetRange, Long> tracker) {
assertTrue(tracker.tryClaim(tracker.currentRestriction().getFrom()));
return resume();
}

@GetInitialRestriction
public OffsetRange getInitialRestriction() {
return new OffsetRange(0, 10);
}

@GetSize
public double getSize(@Restriction OffsetRange range) {
return range.getTo() - range.getFrom();
}
}

private SplittableProcessElementInvoker<Void, String, OffsetRange, Long, Void>.Result runTest(
int totalNumOutputs,
Duration sleepBeforeFirstClaim,
Expand Down Expand Up @@ -237,4 +255,15 @@ public OffsetRange getInitialRestriction(
e.expectMessage("Output is not allowed after a failed tryClaim()");
runTest(brokenFn, new OffsetRange(0, 5));
}

@Test
public void testBacklogBytes() throws Exception {
GetSizeFn fn = new GetSizeFn();
OffsetRange initialRestriction = new OffsetRange(0, 10);
SplittableProcessElementInvoker<Void, String, OffsetRange, Long, Void>.Result res =
runTest(fn, initialRestriction);
// GetSizeFn only claims 1 element and then takes a checkpoint.
assertEquals(9.0, res.getBacklogBytes(), 0.001);
assertEquals(new OffsetRange(1, 10), res.getResidualRestriction());
}
}
Loading
Loading