From c31534e59e92ee1d3ac7f42dfbc75c28eeb303b8 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Tue, 28 Apr 2026 12:34:35 -0700 Subject: [PATCH 1/2] Add staged package hashes --- .../org/apache/beam/runners/dataflow/util/PackageUtil.java | 3 +++ .../apache/beam/runners/dataflow/util/PackageUtilTest.java | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java index 31c9a5f3ce0d..e33fb594e272 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java @@ -438,6 +438,7 @@ public static PackageAttributes forFileToStage( .toString(); destination.setLocation(resourcePath); destination.setName(dest); + destination.setSha256(hash); return new AutoValue_PackageUtil_PackageAttributes( file, null, destination, file.length(), hash); } @@ -456,6 +457,7 @@ public static PackageAttributes forBytesToStage( DataflowPackage targetPackage = new DataflowPackage(); targetPackage.setName(target); targetPackage.setLocation(resourcePath); + targetPackage.setSha256(hashCode.toString()); return new AutoValue_PackageUtil_PackageAttributes( null, bytes, targetPackage, size, hashCode.toString()); @@ -465,6 +467,7 @@ public PackageAttributes withPackageName(String overridePackageName) { DataflowPackage newDestination = new DataflowPackage(); newDestination.setName(overridePackageName); newDestination.setLocation(getDestination().getLocation()); + newDestination.setSha256(getHash()); return new AutoValue_PackageUtil_PackageAttributes( getSource(), getBytes(), newDestination, getSize(), getHash()); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java index 670032425eb2..7e7ef6a8345a 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/PackageUtilTest.java @@ -191,6 +191,8 @@ public void testFileWithExtensionPackageNamingAndSize() throws Exception { assertThat(target.getName(), endsWith(".txt")); assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName())); assertThat(attr.getSize(), equalTo((long) contents.length())); + String expectedHash = Files.asByteSource(tmpFile).hash(Hashing.sha256()).toString(); + assertThat(target.getSha256(), equalTo(expectedHash)); } @Test @@ -299,6 +301,8 @@ public void testPackageUploadWithFileSucceeds() throws Exception { assertThat(target.getName(), endsWith(".txt")); assertThat(target.getLocation(), equalTo(STAGING_PATH + target.getName())); + String expectedHash = Files.asByteSource(tmpFile).hash(Hashing.sha256()).toString(); + assertThat(target.getSha256(), equalTo(expectedHash)); assertThat( new LineReader(Channels.newReader(pipe.source(), StandardCharsets.UTF_8.name())).readLine(), equalTo(contents)); From 57ab03ff3016bd25264b4535e05e84b8ac06965d Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Wed, 29 Apr 2026 13:04:31 -0700 Subject: [PATCH 2/2] Add package hashes --- .../runners/dataflow/internal/apiclient.py | 9 ++++++--- .../runners/dataflow/internal/apiclient_test.py | 16 ++++++++++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 29cb36071488..44f34ac3a687 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -171,14 +171,15 @@ def __init__( self.proto.experiments.append(experiment) # Worker pool(s) information. package_descriptors = [] - for package in packages: + for package, sha256 in packages: package_descriptors.append( dataflow.Package( location='%s/%s' % ( self.google_cloud_options.staging_location.replace( 'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE), package), - name=package)) + name=package, + sha256=sha256)) pool = dataflow.WorkerPool( kind='local' if self.local else 'harness', @@ -649,7 +650,9 @@ def _stage_resources(self, pipeline, options): resource_stager = _LegacyDataflowStager(self) staged_resources = resource_stager.stage_job_resources( resources, staging_location=google_cloud_options.staging_location) - return staged_resources + + name_to_hash = {remote_name: sha256 for _, remote_name, sha256 in resources} + return [(name, name_to_hash.get(name)) for name in staged_resources] def stage_file( self, diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 66b1c8e1e5bb..5fbf805d624d 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -96,6 +96,22 @@ def test_pipeline_url(self): self.fail('No pipeline_url found in %s' % recovered_options) self.assertEqual(pipeline_url.string_value, FAKE_PIPELINE_URL) + def test_environment_packages_with_hash(self): + pipeline_options = PipelineOptions([ + '--temp_location', + 'gs://any-location/temp' + ]) + packages = [('package1', 'hash1'), ('package2', 'hash2')] + env = apiclient.Environment( + packages, + pipeline_options, + '2.0.0', + FAKE_PIPELINE_URL) + self.assertEqual(len(env.proto.workerPools[0].packages), 2) + self.assertEqual(env.proto.workerPools[0].packages[0].name, 'package1') + self.assertEqual(env.proto.workerPools[0].packages[0].sha256, 'hash1') + self.assertEqual(env.proto.workerPools[0].packages[1].name, 'package2') + self.assertEqual(env.proto.workerPools[0].packages[1].sha256, 'hash2') def test_set_network(self): pipeline_options = PipelineOptions([