Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions runners/direct-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def sickbayTests = [
'org.apache.beam.sdk.io.BoundedReadFromUnboundedSourceTest.testTimeBound',
]

task needsRunnerTests(type: Test) {
tasks.register('needsRunnerTests', Test) {
group = "Verification"
description = "Runs tests that require a runner to validate that pipelines/transforms work correctly"

Expand Down Expand Up @@ -148,15 +148,15 @@ task needsRunnerTests(type: Test) {
excludeCategories 'org.apache.beam.sdk.testing.UsesBundleFinalizer'
}
testLogging {
outputs.upToDateWhen {false}
outputs.upToDateWhen { false }
showStandardStreams = true
}
}

// NOTE: This will also run 'NeedsRunner' tests, which are run in the :needsRunnerTests task as well.
// The intention of this task is to mirror the :validatesRunner configuration for other runners,
// such that the test suite can be validated on the in-process DirectRunner.
task validatesRunner(type: Test) {
tasks.register('validatesRunner', Test) {
group = "Verification"
description "Validates Direct runner"

Expand Down Expand Up @@ -217,17 +217,17 @@ createJavaExamplesArchetypeValidationTask(type: 'MobileGaming',
bqDataset: bqDataset,
pubsubTopic: pubsubTopic)

task examplesIntegrationTest(type: Test) {
tasks.register('examplesIntegrationTest', Test) {
description = "Runs examples tests on DirectRunner"

testLogging.showStandardStreams = true

String[] pipelineOptions = [
"--runner=DirectRunner",
"--runnerDeterminedSharding=false",
"--tempLocation=${tempLocation}",
"--tempRoot=${tempLocation}",
"--project=${gcpProject}",
"--runner=DirectRunner",
"--runnerDeterminedSharding=false",
"--tempLocation=${tempLocation}",
"--tempRoot=${tempLocation}",
"--project=${gcpProject}",
]
systemProperty "beamTestPipelineOptions", pipelineOptionsStringCrossPlatformHandling(pipelineOptions)

Expand All @@ -236,7 +236,7 @@ task examplesIntegrationTest(type: Test) {
classpath = configurations.examplesJavaIntegrationTest
testClassesDirs = files(project(":examples:java").sourceSets.test.output.classesDirs)
useJUnit {
filter{
filter {
// TODO (https://github.com/apache/beam/issues/21344) Fix integration Tests to run with DirectRunner: Timeout error
excludeTestsMatching 'org.apache.beam.examples.complete.TfIdfIT'
excludeTestsMatching 'org.apache.beam.examples.WindowedWordCountIT.testWindowedWordCountInBatchDynamicSharding'
Expand Down
62 changes: 32 additions & 30 deletions sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public void processElement(ProcessContext context, @StateId("count") ValueState<

@Test
@Category({NeedsRunner.class, UsesUnboundedSplittableParDo.class})
public void testMatchWatchForNewFiles() throws IOException, InterruptedException {
public void testMatchWatchForNewFiles() throws IOException {
// Write some files to a "source" directory.
final Path sourcePath = tmpFolder.getRoot().toPath().resolve("source");
sourcePath.toFile().mkdir();
Expand All @@ -292,23 +292,7 @@ public void testMatchWatchForNewFiles() throws IOException, InterruptedException
.continuously(
Duration.millis(100),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1))));
PCollection<MatchResult.Metadata> matchAllMetadata =
p.apply("create for matchAll new files", Create.of(watchPath.resolve("*").toString()))
.apply(
"match filename through matchAll",
FileIO.matchAll()
.continuously(
Duration.millis(100),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1))));
PCollection<MatchResult.Metadata> matchUpdatedMetadata =
p.apply(
"match updated",
FileIO.match()
.filepattern(watchPath.resolve("first").toString())
.continuously(
Duration.millis(100),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)),
true));

PCollection<MatchResult.Metadata> matchAllUpdatedMetadata =
p.apply("create for matchAll updated files", Create.of(watchPath.resolve("*").toString()))
.apply(
Expand All @@ -319,7 +303,7 @@ public void testMatchWatchForNewFiles() throws IOException, InterruptedException
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)),
true));

// write one file at the beginning. This will trigger the first output for matchAll
// write one file at the beginning. This will trigger the first output for matchMetadata
Files.copy(
sourcePath.resolve("first"),
watchPath.resolve("first"),
Expand All @@ -337,8 +321,6 @@ public void testMatchWatchForNewFiles() throws IOException, InterruptedException
.apply(ParDo.of(new CopyFilesFn(sourcePath, watchPath)));

assertEquals(PCollection.IsBounded.UNBOUNDED, matchMetadata.isBounded());
assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllMetadata.isBounded());
assertEquals(PCollection.IsBounded.UNBOUNDED, matchUpdatedMetadata.isBounded());
assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllUpdatedMetadata.isBounded());

// We fetch lastModifiedTime from the files in the "source" directory to avoid a race condition
Expand All @@ -352,15 +334,6 @@ public void testMatchWatchForNewFiles() throws IOException, InterruptedException
metadata(
watchPath.resolve("third"), 99, lastModifiedMillis(sourcePath.resolve("third"))));
PAssert.that(matchMetadata).containsInAnyOrder(expectedMatchNew);
PAssert.that(matchAllMetadata).containsInAnyOrder(expectedMatchNew);

List<String> expectedMatchUpdated = Arrays.asList("first", "first", "first");
PCollection<String> matchUpdatedCount =
matchUpdatedMetadata.apply(
"pick up match file name",
MapElements.into(TypeDescriptors.strings())
.via((metadata) -> metadata.resourceId().getFilename()));
PAssert.that(matchUpdatedCount).containsInAnyOrder(expectedMatchUpdated);

// Check watch for file updates. Compare only filename since modified time of copied files are
// uncontrolled.
Expand All @@ -376,6 +349,35 @@ public void testMatchWatchForNewFiles() throws IOException, InterruptedException
p.run();
}

@Test
public void testMatchWatchForNewFiles_UnboundedPCollection() {
// Additional scenarios for testMatchWatchForNewFiles. Only construct the pipeline and check
// output pcoll
final Path watchPath = tmpFolder.getRoot().toPath().resolve("watch");

Comment thread
Abacn marked this conversation as resolved.
PCollection<MatchResult.Metadata> matchAllMetadata =
p.apply("create for matchAll new files", Create.of(watchPath + "/*"))
.apply(
"match filename through matchAll",
FileIO.matchAll()
.continuously(
Duration.millis(100),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1))));

PCollection<MatchResult.Metadata> matchUpdatedMetadata =
p.apply(
"match updated",
FileIO.match()
.filepattern(watchPath.resolve("first").toString())
.continuously(
Duration.millis(100),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)),
true));

assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllMetadata.isBounded());
assertEquals(PCollection.IsBounded.UNBOUNDED, matchUpdatedMetadata.isBounded());
}
Comment on lines +353 to +379
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This new test method has two significant issues:

  1. Compilation Error: At line 359, watchPath + "/*" will not compile because watchPath is a java.nio.file.Path object and Java does not support the + operator for Path and String. It should use watchPath.resolve("*").toString() to be correct and consistent with the rest of the file.
  2. Test Failure (Abandoned Nodes): The test uses the TestPipeline rule p but does not call p.run(). By default, TestPipeline throws an exception if nodes are added but the pipeline is not executed. Since this test only checks the boundedness of the PCollections, you should disable the abandoned node enforcement using p.enableAbandonedNodeEnforcement(false).

Here is the corrected method:

  public void testMatchWatchForNewFiles_UnboundedPCollection() {
    // Additional scenarios for testMatchWatchForNewFiles. Only construct the pipeline and check
    // output pcoll
    p.enableAbandonedNodeEnforcement(false);
    final Path watchPath = tmpFolder.getRoot().toPath().resolve("watch");

    PCollection<MatchResult.Metadata> matchAllMetadata =
        p.apply("create for matchAll new files", Create.of(watchPath.resolve("*").toString()))
            .apply(
                "match filename through matchAll",
                FileIO.matchAll()
                    .continuously(
                        Duration.millis(100),
                        Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1))));

    PCollection<MatchResult.Metadata> matchUpdatedMetadata =
        p.apply(
            "match updated",
            FileIO.match()
                .filepattern(watchPath.resolve("first").toString())
                .continuously(
                    Duration.millis(100),
                    Watch.Growth.afterTimeSinceNewOutput(Duration.standardSeconds(1)),
                    true));

    assertEquals(PCollection.IsBounded.UNBOUNDED, matchAllMetadata.isBounded());
    assertEquals(PCollection.IsBounded.UNBOUNDED, matchUpdatedMetadata.isBounded());
  }


@Test
@Category(NeedsRunner.class)
public void testRead() throws IOException {
Expand Down
Loading