Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,20 +187,26 @@ Here is an example of what the comment could look like:
> - Processor named `UpdateAttribute` is configured with 4 concurrent tasks

#### Flow Changes

**`MyExample`** — 8 changes
- The destination of a connection has changed from `UpdateAttribute` to `InvokeHTTP`
- A self-loop connection `[success]` has been added on `UpdateAttribute`
- A connection `[success]` from `My Generate FlowFile Processor` to `UpdateAttribute` has been added
- A Processor of type `GenerateFlowFile` named `GenerateFlowFile` has been renamed from `GenerateFlowFile` to `My Generate FlowFile Processor`
- In Processor of type `GenerateFlowFile` named `GenerateFlowFile`, the Scheduling Strategy changed from `TIMER_DRIVEN` to `CRON_DRIVEN`
- A Parameter Context named `Test Parameter Context` has been added
- The Parameter Context `Test Parameter Context` with parameters `{addedParam=newValue}` has been added to the process group `TestingFlowDiff`
- A Processor of type `UpdateAttribute` named `UpdateAttribute` has been removed
- In Processor of type `GenerateFlowFile` named `GenerateFlowFile`, the Run Schedule changed from `1 min` to `* * * * * ?`
- A Parameter Context named `Another one to delete` has been added

**`MyExample > Ingestion`** — 1 change
- A Processor of type `UpdateAttribute` named `UpdateAttribute` has been added with the configuration [`ALL` nodes, `4` concurrent tasks, `0ms` run duration, `WARN` bulletin level, `TIMER_DRIVEN` (`0 sec`), `30 sec` penalty duration, `1 sec` yield duration] and the below properties:
- `Store State` = `Do not store state`
- `canonical-value-lookup-cache-size` = `100`

**`(Parameter Contexts)`** — 2 changes
- A Parameter Context named `Test Parameter Context` has been added
- A Parameter Context named `Another one to delete` has been added

#### Bundle Changes
- The bundle `org.apache.nifi:nifi-standard-nar` has been changed from version `2.1.0` to version `2.2.0`
```
94 changes: 79 additions & 15 deletions flow-diff/src/main/java/com/snowflake/openflow/FlowDiff.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,41 @@ private static boolean executeFlowDiffForOneFlow(final String pathA, final Strin

System.out.println("#### Flow Changes");

// Phase 1: collect bundle changes and group remaining diffs by process group path
final Map<String, List<FlowDifference>> groupedDiffs = new LinkedHashMap<>();
for (FlowDifference diff : diffs) {
switch (diff.getDifferenceType()) {
case BUNDLE_CHANGED:
Bundle before = (Bundle) diff.getValueA();
Bundle after = (Bundle) diff.getValueB();
bundleChanges.add("- The bundle `"
+ before.getGroup() + ":" + before.getArtifact()
+ "` has been changed from version "
+ "`" + before.getVersion() + "` to version `" + after.getVersion() + "`");
break;
case SIZE_CHANGED, STYLE_CHANGED, POSITION_CHANGED, BENDPOINTS_CHANGED, ZINDEX_CHANGED:
// no need to print these, they are not relevant for the user
break;
default:
groupedDiffs.computeIfAbsent(getGroupPathForDiff(diff), k -> new ArrayList<>()).add(diff);
}
}

// Phase 2: print each group with a header, parameter contexts section last
final List<String> sortedPaths = new ArrayList<>(groupedDiffs.keySet());
sortedPaths.sort((a, b) -> {
if (a.equals(PARAMETER_CONTEXTS_SECTION)) return 1;
if (b.equals(PARAMETER_CONTEXTS_SECTION)) return -1;
return a.compareTo(b);
});

for (final String groupPath : sortedPaths) {
final List<FlowDifference> groupDiffs = groupedDiffs.get(groupPath);
final int count = groupDiffs.size();
System.out.println("");
System.out.println("**`" + groupPath + "`** \u2014 " + count + (count == 1 ? " change" : " changes"));

for (FlowDifference diff : groupDiffs) {

switch (diff.getDifferenceType()) {
case COMPONENT_ADDED: {
Expand All @@ -212,10 +246,8 @@ private static boolean executeFlowDiffForOneFlow(final String pathA, final Strin
printConfigurableExtensionProperties(proc);
} else if (diff.getComponentB().getComponentType().equals(ComponentType.CONTROLLER_SERVICE)) {
final VersionedControllerService cs = (VersionedControllerService) diff.getComponentB();
final VersionedProcessGroup csPg = processGroups.get(cs.getGroupIdentifier());
final String pgName = csPg == null ? cs.getGroupIdentifier() : csPg.getName();
System.out.println("- A " + printComponent(diff.getComponentB())
+ " has been added in Process Group `" + pgName + "` with the below properties:");
+ " has been added with the below properties:");
printConfigurableExtensionProperties(cs);
} else if (diff.getComponentB().getComponentType().equals(ComponentType.LABEL)) {
final VersionedLabel label = (VersionedLabel) diff.getComponentB();
Expand Down Expand Up @@ -344,14 +376,6 @@ private static boolean executeFlowDiffForOneFlow(final String pathA, final Strin
+ ", the Scheduling Strategy changed from `" + diff.getValueA() + "` to `" + diff.getValueB() + "`");
break;
}
case BUNDLE_CHANGED:
Bundle before = (Bundle) diff.getValueA();
Bundle after = (Bundle) diff.getValueB();
bundleChanges.add("- The bundle `"
+ before.getGroup() + ":" + before.getArtifact()
+ "` has been changed from version "
+ "`" + before.getVersion() + "` to version `" + after.getVersion() + "`");
break;
case NAME_CHANGED: {
System.out.println("- A " + printComponent(diff.getComponentA())
+ " has been renamed from `" + diff.getValueA() + "` to `" + diff.getValueB() + "`");
Expand Down Expand Up @@ -518,9 +542,6 @@ private static boolean executeFlowDiffForOneFlow(final String pathA, final Strin
System.out.println("- In " + printComponent(diff.getComponentA()) + ", the sensitivity of the property `"
+ diff.getFieldName().get() + "` changed from `" + diff.getValueA() + "` to `" + diff.getValueB() + "`");
break;
case SIZE_CHANGED, STYLE_CHANGED, POSITION_CHANGED, BENDPOINTS_CHANGED, ZINDEX_CHANGED:
// no need to print these, they are not relevant for the user
break;
case FLOWFILE_CONCURRENCY_CHANGED:
System.out.println("- In " + printComponent(diff.getComponentB())
+ ", the FlowFile Concurrency changed from `" + diff.getValueA() + "` to `" + diff.getValueB() + "`");
Expand All @@ -546,7 +567,8 @@ private static boolean executeFlowDiffForOneFlow(final String pathA, final Strin
System.out.println(" - " + diff.getFieldName());
break;
}
}
} // end inner diff loop
} // end group loop

if (bundleChanges.size() > 0) {
System.out.println("");
Expand Down Expand Up @@ -589,6 +611,12 @@ public static Set<FlowDifference> getDiff(final String pathA, final String pathB
sanitizeProcessGroup(snapshotB.getFlowSnapshot().getFlowContents());

processGroups = new HashMap<>();
// Register A's groups first so B can overwrite — B takes display-name priority
if (snapshotA != null) {
final VersionedProcessGroup rootPGA = snapshotA.getFlowSnapshot().getFlowContents();
processGroups.put(rootPGA.getIdentifier(), rootPGA);
registerProcessGroups(rootPGA);
}
VersionedProcessGroup rootPG = snapshotB.getFlowSnapshot().getFlowContents();
processGroups.put(rootPG.getIdentifier(), rootPG);
registerProcessGroups(rootPG);
Expand Down Expand Up @@ -667,6 +695,42 @@ private static void registerProcessGroups(final VersionedProcessGroup rootPG) {
}
}

static final String PARAMETER_CONTEXTS_SECTION = "(Parameter Contexts)";

static String buildProcessGroupPath(final String startGroupId) {
if (startGroupId == null) {
return PARAMETER_CONTEXTS_SECTION;
}
final List<String> pathParts = new ArrayList<>();
String currentId = startGroupId;
final Set<String> visited = new HashSet<>();
while (currentId != null && !visited.contains(currentId)) {
visited.add(currentId);
final VersionedProcessGroup pg = processGroups.get(currentId);
if (pg == null) {
pathParts.add(0, currentId);
break;
}
final String name = pg.getName();
pathParts.add(0, (name != null && !name.isEmpty()) ? name : currentId);
currentId = pg.getGroupIdentifier();
}
return pathParts.isEmpty() ? startGroupId : String.join(" > ", pathParts);
}

static String getGroupPathForDiff(final FlowDifference diff) {
final VersionedComponent component = diff.getComponentA() != null ? diff.getComponentA() : diff.getComponentB();
if (component == null) {
return PARAMETER_CONTEXTS_SECTION;
}
if (component instanceof VersionedParameterContext) {
return PARAMETER_CONTEXTS_SECTION;
}
final String groupId = component.getGroupIdentifier();
// groupId is null when the component is the root process group itself
return groupId != null ? buildProcessGroupPath(groupId) : buildProcessGroupPath(component.getIdentifier());
}

private static void sanitizeProcessGroup(final VersionedProcessGroup group) {
if (group == null) {
return;
Expand Down
70 changes: 70 additions & 0 deletions flow-diff/src/test/java/com/snowflake/openflow/FlowDiffTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
import org.apache.nifi.registry.flow.diff.FlowDifference;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Set;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class FlowDiffTest {
Expand Down Expand Up @@ -129,4 +133,70 @@ void testCheckstyleFailExitCode() throws IOException {
"true" });
assertEquals(2, exitCode);
}

@Test
void testNestedGroupsHaveSeparateSections() throws IOException {
final Set<FlowDifference> diffs = FlowDiff.getDiff(
"src/test/resources/flow_v8_nested_groups_before.json",
"src/test/resources/flow_v8_nested_groups_after.json",
false, null);
// One PROPERTY_CHANGED in Group A and one COMPONENT_ADDED in Group B
assertTrue(diffs.stream().anyMatch(d -> d.getDifferenceType().equals(DifferenceType.PROPERTY_CHANGED)));
assertTrue(diffs.stream().anyMatch(d -> d.getDifferenceType().equals(DifferenceType.COMPONENT_ADDED)));
assertEquals(2, diffs.size());
}

@Test
void testGroupedOutputContainsGroupHeaders() throws IOException {
final ByteArrayOutputStream buf = new ByteArrayOutputStream();
final PrintStream orig = System.out;
System.setOut(new PrintStream(buf, true, StandardCharsets.UTF_8));
try {
FlowDiff.run(new String[]{
"src/test/resources/flow_v8_nested_groups_before.json",
"src/test/resources/flow_v8_nested_groups_after.json",
"", "", ""
});
} finally {
System.setOut(orig);
}
final String output = buf.toString(StandardCharsets.UTF_8);
// Both group paths must appear as bold headers
assertTrue(output.contains("**`NestedGroupsFlow > Group A`**"), "Group A header missing");
assertTrue(output.contains("**`NestedGroupsFlow > Group B`**"), "Group B header missing");
// Group A comes before Group B (alphabetical sort)
assertTrue(output.indexOf("Group A") < output.indexOf("Group B"), "Groups not in alphabetical order");
// Each section shows a change count
assertTrue(output.contains("1 change"), "Change count missing");
// The property change line belongs under Group A
final int groupAPos = output.indexOf("**`NestedGroupsFlow > Group A`**");
final int groupBPos = output.indexOf("**`NestedGroupsFlow > Group B`**");
final int propertyChangeLine = output.indexOf("File Size");
assertTrue(propertyChangeLine > groupAPos && propertyChangeLine < groupBPos,
"File Size change should appear under Group A");
// The added processor line belongs under Group B
final int addedProcessorLine = output.indexOf("UpdateAttribute");
assertTrue(addedProcessorLine > groupBPos, "UpdateAttribute addition should appear under Group B");
}

@Test
void testSingleGroupOutputHasGroupHeader() throws IOException {
final ByteArrayOutputStream buf = new ByteArrayOutputStream();
final PrintStream orig = System.out;
System.setOut(new PrintStream(buf, true, StandardCharsets.UTF_8));
try {
FlowDiff.run(new String[]{
"src/test/resources/flow_v1_initial.json",
"src/test/resources/flow_v2_added_component.json",
"", "", ""
});
} finally {
System.setOut(orig);
}
final String output = buf.toString(StandardCharsets.UTF_8);
// Single-group flow still gets a group header
assertTrue(output.contains("**`TestingFlowDiff`**"), "Root process group header missing");
// No Parameter Contexts section for this diff (no param context changes)
assertFalse(output.contains(FlowDiff.PARAMETER_CONTEXTS_SECTION), "Unexpected parameter contexts section");
}
}
Loading
Loading