Skip to content

Adds backlog reporting support for non-fnapi based SDF's.#38346

Open
acrites wants to merge 4 commits intoapache:masterfrom
acrites:sdf-backlog
Open

Adds backlog reporting support for non-fnapi based SDF's.#38346
acrites wants to merge 4 commits intoapache:masterfrom
acrites:sdf-backlog

Conversation

@acrites
Copy link
Copy Markdown
Contributor

@acrites acrites commented Apr 30, 2026

Non FnApi-based SDF's now call GetSize on the residual restriction to get source backlog. For the DataflowRunner, this backlog is sent in the CommitWorkRequest sent to the backend, which will allow for better autoscaling decisions.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a mechanism for non-FnApi based Splittable DoFns (SDFs) to report their source backlog. By invoking the GetSize method on the residual restriction, the runner can now capture the remaining work size. This information is propagated through the execution context and included in the Dataflow backend's CommitWorkRequest, which allows the Dataflow service to make more informed autoscaling decisions based on the actual remaining work.

Highlights

  • Backlog Reporting for Non-FnApi SDFs: Implemented support for non-FnApi Splittable DoFns to report source backlog by calling GetSize on the residual restriction.
  • DataflowRunner Integration: Updated DataflowRunner to include reported backlog in CommitWorkRequest, enabling improved autoscaling decisions.
  • Core Infrastructure Updates: Added a callback mechanism in SplittableProcessElementInvoker and SplittableParDoViaKeyedWorkItems to propagate backlog information to the execution context.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@acrites
Copy link
Copy Markdown
Contributor Author

acrites commented Apr 30, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements backlog reporting for Splittable DoFns (SDF) by introducing a getSize mechanism to calculate and propagate backlog bytes to the runner. Key changes include updates to the SplittableProcessElementInvoker and StepContext to handle backlog data, along with an implementation for unbounded sources. Review feedback suggests refining exception handling in the getSize method and addresses the potential brittleness of filtering out legacy default backlog values.

}

@GetSize
public double getSize(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The getSize method throws a generic Exception. While the @GetSize annotation allows this, it is generally better practice to throw more specific exceptions, such as IOException, especially since createReader and reader.start() are the primary sources of failure here.

getWorkItem().getWorkToken(),
activeReader);
activeReader = null;
} else if (backlogBytes != UnboundedReader.BACKLOG_UNKNOWN && backlogBytes != 1L) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The condition backlogBytes != 1L is used to ignore a legacy default value. However, if a user actually has a backlog of exactly 1 byte (or a value that casts to 1L), it will be incorrectly ignored. While likely negligible in most streaming scenarios, this magic number check is a bit brittle.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the ability for Splittable DoFns (SDF) to report backlog bytes, which is essential for accurate autoscaling in runners like Dataflow. The changes include adding a backlogBytes field to the SDF Result object, updating the StepContext to allow reporting these values, and implementing getSize for UnboundedSourceAsSDFWrapperFn. Feedback is provided regarding potential data loss when casting backlog values from double to long in the Dataflow worker and a suggested optimization in Read.java to avoid expensive reader creation for empty sources.


@Override
public void setBacklogBytes(double backlogBytes) {
StreamingModeExecutionContext.this.backlogBytes = (long) backlogBytes;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The backlogBytes value is cast from double to long here. In flushState() (line 532), a value of 1L is explicitly ignored to avoid legacy default values. If a user-reported backlog is between 1.0 and 2.0 (e.g., 1.5), it will be truncated to 1L and subsequently ignored. While a backlog of 1 is typically negligible for autoscaling, this truncation could lead to unexpected behavior if the user-reported value is intended to be significant.

Comment on lines +635 to +636
try (UnboundedReader<OutputT> reader =
restriction.getSource().createReader(pipelineOptions, restriction.getCheckpoint())) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Creating and starting a reader in getSize can be expensive for some UnboundedSource implementations. As an optimization, consider checking if the source is an instance of EmptyUnboundedSource and returning 0.0 immediately to avoid unnecessary reader creation.

      if (restriction.getSource() instanceof EmptyUnboundedSource) {
        return 0.0;
      }
      try (UnboundedReader<OutputT> reader =
          restriction.getSource().createReader(pipelineOptions, restriction.getCheckpoint())) {

@github-actions
Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@github-actions github-actions Bot removed the java label May 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant