diff --git a/runners/direct-java/build.gradle b/runners/direct-java/build.gradle index 1ab702da3213..328b77875dbf 100644 --- a/runners/direct-java/build.gradle +++ b/runners/direct-java/build.gradle @@ -116,7 +116,7 @@ def sickbayTests = [ 'org.apache.beam.sdk.io.BoundedReadFromUnboundedSourceTest.testTimeBound', ] -task needsRunnerTests(type: Test) { +tasks.register('needsRunnerTests', Test) { group = "Verification" description = "Runs tests that require a runner to validate that pipelines/transforms work correctly" @@ -148,7 +148,7 @@ task needsRunnerTests(type: Test) { excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer' } testLogging { - outputs.upToDateWhen {false} + outputs.upToDateWhen { false } showStandardStreams = true } } @@ -156,7 +156,7 @@ task needsRunnerTests(type: Test) { // NOTE: This will also run 'NeedsRunner' tests, which are run in the :needsRunnerTests task as well. // The intention of this task is to mirror the :validatesRunner configuration for other runners, // such that the test suite can be validated on the in-process DirectRunner. -task validatesRunner(type: Test) { +tasks.register('validatesRunner', Test) { group = "Verification" description "Validates Direct runner" @@ -217,17 +217,17 @@ createJavaExamplesArchetypeValidationTask(type: 'MobileGaming', bqDataset: bqDataset, pubsubTopic: pubsubTopic) -task examplesIntegrationTest(type: Test) { +tasks.register('examplesIntegrationTest', Test) { description = "Runs examples tests on DirectRunner" testLogging.showStandardStreams = true String[] pipelineOptions = [ - "--runner=DirectRunner", - "--runnerDeterminedSharding=false", - "--tempLocation=${tempLocation}", - "--tempRoot=${tempLocation}", - "--project=${gcpProject}", + "--runner=DirectRunner", + "--runnerDeterminedSharding=false", + "--tempLocation=${tempLocation}", + "--tempRoot=${tempLocation}", + "--project=${gcpProject}", ] systemProperty "beamTestPipelineOptions", pipelineOptionsStringCrossPlatformHandling(pipelineOptions) @@ -236,7 +236,7 @@ task examplesIntegrationTest(type: Test) { classpath = configurations.examplesJavaIntegrationTest testClassesDirs = files(project(":examples:java").sourceSets.test.output.classesDirs) useJUnit { - filter{ + filter { // TODO (https://github.com/apache/beam/issues/21344) Fix integration Tests to run with DirectRunner: Timeout error excludeTestsMatching 'org.apache.beam.examples.complete.TfIdfIT' excludeTestsMatching 'org.apache.beam.examples.WindowedWordCountIT.testWindowedWordCountInBatchDynamicSharding' diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java index dffc4943bfab..3db4c4a4d004 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java @@ -273,7 +273,7 @@ public void processElement(ProcessContext context, @StateId("count") ValueState< @Test @Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class}) - public void testMatchWatchForNewFiles() throws IOException, InterruptedException { + public void testMatchWatchForNewFiles() throws IOException { // Write some files to a "source" directory. final Path sourcePath = tmpFolder.getRoot().toPath().resolve("source"); sourcePath.toFile().mkdir(); @@ -292,23 +292,7 @@ public void testMatchWatchForNewFiles() throws IOException, InterruptedException .continuously( Duration.millis(100), Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)))); - PCollection matchAllMetadata = - p.apply("create for matchAll new files", Create.of(watchPath.resolve("*").toString())) - .apply( - "match filename through matchAll", - FileIO.matchAll() - .continuously( - Duration.millis(100), - Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)))); - PCollection matchUpdatedMetadata = - p.apply( - "match updated", - FileIO.match() - .filepattern(watchPath.resolve("first").toString()) - .continuously( - Duration.millis(100), - Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)), - true)); + PCollection matchAllUpdatedMetadata = p.apply("create for matchAll updated files", Create.of(watchPath.resolve("*").toString())) .apply( @@ -319,7 +303,7 @@ public void testMatchWatchForNewFiles() throws IOException, InterruptedException Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)), true)); - // write one file at the beginning. This will trigger the first output for matchAll + // write one file at the beginning. This will trigger the first output for matchMetadata Files.copy( sourcePath.resolve("first"), watchPath.resolve("first"), @@ -337,8 +321,6 @@ public void testMatchWatchForNewFiles() throws IOException, InterruptedException .apply(ParDo.of(new CopyFilesFn(sourcePath, watchPath))); assertEquals(PCollection.IsBounded.UNBOUNDED, matchMetadata.isBounded()); - assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllMetadata.isBounded()); - assertEquals(PCollection.IsBounded.UNBOUNDED, matchUpdatedMetadata.isBounded()); assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllUpdatedMetadata.isBounded()); // We fetch lastModifiedTime from the files in the "source" directory to avoid a race condition @@ -352,15 +334,6 @@ public void testMatchWatchForNewFiles() throws IOException, InterruptedException metadata( watchPath.resolve("third"), 99, lastModifiedMillis(sourcePath.resolve("third")))); PAssert.that(matchMetadata).containsInAnyOrder(expectedMatchNew); - PAssert.that(matchAllMetadata).containsInAnyOrder(expectedMatchNew); - - List expectedMatchUpdated = Arrays.asList("first", "first", "first"); - PCollection matchUpdatedCount = - matchUpdatedMetadata.apply( - "pick up match file name", - MapElements.into(TypeDescriptors.strings()) - .via((metadata) -> metadata.resourceId().getFilename())); - PAssert.that(matchUpdatedCount).containsInAnyOrder(expectedMatchUpdated); // Check watch for file updates. Compare only filename since modified time of copied files are // uncontrolled. @@ -376,6 +349,35 @@ public void testMatchWatchForNewFiles() throws IOException, InterruptedException p.run(); } + @Test + public void testMatchWatchForNewFiles_UnboundedPCollection() { + // Additional scenarios for testMatchWatchForNewFiles. Only construct the pipeline and check + // output pcoll + final Path watchPath = tmpFolder.getRoot().toPath().resolve("watch"); + + PCollection matchAllMetadata = + p.apply("create for matchAll new files", Create.of(watchPath + "/*")) + .apply( + "match filename through matchAll", + FileIO.matchAll() + .continuously( + Duration.millis(100), + Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)))); + + PCollection matchUpdatedMetadata = + p.apply( + "match updated", + FileIO.match() + .filepattern(watchPath.resolve("first").toString()) + .continuously( + Duration.millis(100), + Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)), + true)); + + assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllMetadata.isBounded()); + assertEquals(PCollection.IsBounded.UNBOUNDED, matchUpdatedMetadata.isBounded()); + } + @Test @Category(NeedsRunner.class) public void testRead() throws IOException {