Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion java/tools/src/java/org/apache/orc/tools/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public static void main(String[] args) throws Exception {
System.err.println(" data - print the data from the ORC file");
System.err.println(" json-schema - scan JSON files to determine their schema");
System.err.println(" key - print information about the keys");
System.err.println(" merge - merge multiple ORC files into a single ORC file");
System.err.println(" merge - merge multiple ORC files into one or more ORC files");
System.err.println(" meta - print the metadata about the ORC file");
System.err.println(" scan - scan the ORC file");
System.err.println(" sizes - list size on disk of each column");
Expand Down
140 changes: 132 additions & 8 deletions java/tools/src/java/org/apache/orc/tools/MergeFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@
import java.util.Set;

/**
* Merge multiple ORC files that all have the same schema into a single ORC file.
* Merge multiple ORC files that all have the same schema into one or more ORC files.
* When {@code --maxSize} is specified, the tool splits output into multiple part files
* under the given output directory, each not exceeding the specified size threshold.
*/
public class MergeFiles {

static final String PART_FILE_FORMAT = "part-%05d.orc";

public static void main(Configuration conf, String[] args) throws Exception {
Options opts = createOptions();
CommandLine cli = new DefaultParser().parse(opts, args);
Expand All @@ -56,27 +60,60 @@ public static void main(Configuration conf, String[] args) throws Exception {
}
boolean ignoreExtension = cli.hasOption("ignoreExtension");

List<Path> inputFiles = new ArrayList<>();
OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf);
long maxSizeBytes = 0;
if (cli.hasOption("maxSize")) {
try {
maxSizeBytes = Long.parseLong(cli.getOptionValue("maxSize"));
} catch (NumberFormatException e) {
System.err.println("--maxSize requires a numeric value in bytes.");
System.exit(1);
}
if (maxSizeBytes <= 0) {
System.err.println("--maxSize must be a positive number of bytes.");
System.exit(1);
}
}
Comment thread
QianyongY marked this conversation as resolved.

List<LocatedFileStatus> inputStatuses = new ArrayList<>();
String[] files = cli.getArgs();
for (String root : files) {
Path rootPath = new Path(root);
FileSystem fs = rootPath.getFileSystem(conf);
for (RemoteIterator<LocatedFileStatus> itr = fs.listFiles(rootPath, true); itr.hasNext(); ) {
LocatedFileStatus status = itr.next();
if (status.isFile() && (ignoreExtension || status.getPath().getName().endsWith(".orc"))) {
inputFiles.add(status.getPath());
inputStatuses.add(status);
}
}
}
Comment thread
cxzl25 marked this conversation as resolved.
Outdated
if (inputFiles.isEmpty()) {
if (inputStatuses.isEmpty()) {
System.err.println("No files found.");
System.exit(1);
}

List<Path> mergedFiles = OrcFile.mergeFiles(
new Path(outputFilename), writerOptions, inputFiles);
List<Path> inputFiles = new ArrayList<>(inputStatuses.size());
for (LocatedFileStatus s : inputStatuses) {
inputFiles.add(s.getPath());
}
Comment thread
QianyongY marked this conversation as resolved.
Outdated

OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf);

if (maxSizeBytes > 0) {
mergeIntoMultipleFiles(conf, writerOptions, inputStatuses, inputFiles,
new Path(outputFilename), maxSizeBytes);
} else {
mergeIntoSingleFile(writerOptions, inputFiles, new Path(outputFilename), outputFilename);
}
}

/**
* Original single-output behavior (no --maxSize).
*/
private static void mergeIntoSingleFile(OrcFile.WriterOptions writerOptions,
List<Path> inputFiles,
Path outputPath,
String outputFilename) throws Exception {
List<Path> mergedFiles = OrcFile.mergeFiles(outputPath, writerOptions, inputFiles);

List<Path> unSuccessMergedFiles = new ArrayList<>();
if (mergedFiles.size() != inputFiles.size()) {
Expand All @@ -100,11 +137,88 @@ public static void main(Configuration conf, String[] args) throws Exception {
}
}

/**
* Multi-output behavior when --maxSize is set.
* Input files are grouped by cumulative raw file size; each group is merged into
* a separate part file (part-00000.orc, part-00001.orc, ...) under outputDir.
* A single file whose size already exceeds maxSizeBytes is placed in its own part.
*/
private static void mergeIntoMultipleFiles(Configuration conf,
OrcFile.WriterOptions writerOptions,
List<LocatedFileStatus> inputStatuses,
List<Path> inputFiles,
Path outputDir,
long maxSizeBytes) throws Exception {
FileSystem outFs = outputDir.getFileSystem(conf);
if (outFs.exists(outputDir)) {
if (!outFs.getFileStatus(outputDir).isDirectory()) {
throw new IllegalArgumentException(
"Output path already exists and is not a directory: " + outputDir);
}
if (outFs.listStatus(outputDir).length > 0) {
throw new IllegalArgumentException(
"Output directory must be empty for multi-file merge: " + outputDir);
Comment thread
QianyongY marked this conversation as resolved.
Outdated
}
} else if (!outFs.mkdirs(outputDir)) {
throw new IllegalStateException("Failed to create output directory: " + outputDir);
}
Comment thread
cxzl25 marked this conversation as resolved.
Outdated

// Group input files into batches where each batch's total size <= maxSizeBytes.
List<List<Path>> batches = new ArrayList<>();
List<Path> currentBatch = new ArrayList<>();
long currentBatchSize = 0;

for (LocatedFileStatus status : inputStatuses) {
long fileSize = status.getLen();
if (!currentBatch.isEmpty() && currentBatchSize + fileSize > maxSizeBytes) {
Comment thread
QianyongY marked this conversation as resolved.
Outdated
batches.add(currentBatch);
currentBatch = new ArrayList<>();
currentBatchSize = 0;
}
currentBatch.add(status.getPath());
currentBatchSize += fileSize;
}
if (!currentBatch.isEmpty()) {
batches.add(currentBatch);
}

int totalMerged = 0;
List<Path> allUnmerged = new ArrayList<>();

for (int i = 0; i < batches.size(); i++) {
List<Path> batch = batches.get(i);
Path partOutput = new Path(outputDir, String.format(PART_FILE_FORMAT, i));
List<Path> merged = OrcFile.mergeFiles(partOutput, writerOptions.clone(), batch);
totalMerged += merged.size();

if (merged.size() != batch.size()) {
Set<Path> mergedSet = new HashSet<>(merged);
for (Path p : batch) {
if (!mergedSet.contains(p)) {
allUnmerged.add(p);
}
}
}
}

Comment thread
cxzl25 marked this conversation as resolved.
if (!allUnmerged.isEmpty()) {
System.err.println("List of files that could not be merged:");
allUnmerged.forEach(path -> System.err.println(path.toString()));
}

System.out.printf(
"Output path: %s, Input files size: %d, Merge files size: %d, Output files: %d%n",
outputDir, inputFiles.size(), totalMerged, batches.size());
if (!allUnmerged.isEmpty()) {
System.exit(1);
}
}

private static Options createOptions() {
Options result = new Options();
result.addOption(Option.builder("o")
.longOpt("output")
.desc("Output filename")
.desc("Output filename (single-file mode) or output directory (multi-file mode)")
.hasArg()
.build());

Expand All @@ -113,6 +227,16 @@ private static Options createOptions() {
.desc("Ignore ORC file extension")
.build());

result.addOption(Option.builder("m")
.longOpt("maxSize")
Comment thread
QianyongY marked this conversation as resolved.
.desc("Maximum cumulative input file size in bytes per output ORC file. When set, "
+ "--output is treated as an output directory and merged files are written as "
+ "part-00000.orc, part-00001.orc, etc. Input files are grouped at file "
+ "boundaries so an individual file larger than this threshold will still be "
+ "placed in its own part.")
.hasArg()
.build());
Comment thread
QianyongY marked this conversation as resolved.

result.addOption(Option.builder("h")
.longOpt("help")
.desc("Print help message")
Expand Down
82 changes: 82 additions & 0 deletions java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class TestMergeFiles implements TestConf {
@BeforeEach
public void openFileSystem() throws Exception {
fs = FileSystem.getLocal(conf);
fs.delete(workDir, true);
fs.mkdirs(workDir);
fs.deleteOnExit(workDir);
testFilePath = new Path(workDir + File.separator + "TestMergeFiles.testMerge.orc");
Expand Down Expand Up @@ -107,4 +108,85 @@ public void testMerge() throws Exception {
assertEquals(10000 + 20000, reader.getNumberOfRows());
}
}

/**
* Verifies that --maxSize splits input files into multiple part files under the output
* directory. Three source files are created; a tight size threshold forces them to be
* written into at least two part files.
*/
@Test
public void testMergeWithMaxSize() throws Exception {
Comment thread
QianyongY marked this conversation as resolved.
TypeDescription schema = TypeDescription.fromString("struct<x:int,y:string>");

// Create 3 source ORC files.
String[] sourceNames = {
workDir + File.separator + "ms-1.orc",
workDir + File.separator + "ms-2.orc",
workDir + File.separator + "ms-3.orc"
};
int[] rowCounts = {5000, 5000, 5000};
for (int f = 0; f < sourceNames.length; f++) {
Writer writer = OrcFile.createWriter(new Path(sourceNames[f]),
OrcFile.writerOptions(conf).setSchema(schema));
VectorizedRowBatch batch = schema.createRowBatch();
LongColumnVector x = (LongColumnVector) batch.cols[0];
BytesColumnVector y = (BytesColumnVector) batch.cols[1];
for (int r = 0; r < rowCounts[f]; ++r) {
int row = batch.size++;
x.vector[row] = r;
byte[] buffer = ("val-" + r).getBytes();
y.setRef(row, buffer, 0, buffer.length);
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
}
if (batch.size != 0) {
writer.addRowBatch(batch);
}
writer.close();
}

// Measure the size of the first source file to compute a threshold that forces a split.
long singleFileSize = fs.getFileStatus(new Path(sourceNames[0])).getLen();
// Threshold: slightly larger than one file so at most one file fits per part.
long maxSize = singleFileSize + 1;
Comment thread
QianyongY marked this conversation as resolved.
Outdated

Path outputDir = new Path(workDir + File.separator + "merge-multi-out");
fs.delete(outputDir, true);

PrintStream origOut = System.out;
ByteArrayOutputStream myOut = new ByteArrayOutputStream();
System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8));
try {
MergeFiles.main(conf, new String[]{workDir.toString(),
"--output", outputDir.toString(),
"--maxSize", String.valueOf(maxSize)});
System.out.flush();
} finally {
System.setOut(origOut);
}
String output = myOut.toString(StandardCharsets.UTF_8);
System.out.println(output);
Copy link

Copilot AI Apr 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests unconditionally print the captured tool output to stdout, which can make CI logs noisy and harder to read. Consider removing this println (or only printing on assertion failure) since the assertions already include helpful context messages.

Suggested change
System.out.println(output);

Copilot uses AI. Check for mistakes.

assertTrue(output.contains("Input files size: 3"), "Should report 3 input files");
assertTrue(output.contains("Merge files size: 3"), "All 3 files should be merged");
assertTrue(fs.isDirectory(outputDir), "Output directory should be created");

// Verify that multiple part files were created and total row count is correct.
long totalRows = 0;
int partCount = 0;
for (int i = 0; ; i++) {
Path part = new Path(outputDir, String.format(MergeFiles.PART_FILE_FORMAT, i));
if (!fs.exists(part)) {
break;
}
partCount++;
try (Reader reader = OrcFile.createReader(part, OrcFile.readerOptions(conf))) {
totalRows += reader.getNumberOfRows();
}
}
assertTrue(partCount > 1, "Expected more than one output part file, got: " + partCount);
assertEquals(5000 + 5000 + 5000, totalRows, "Total row count across all parts should match");
}
}
30 changes: 28 additions & 2 deletions site/_docs/java-tools.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ The subcommands for the tools are:
* data - print the data of an ORC file
* json-schema (since ORC 1.4) - determine the schema of JSON documents
* key (since ORC 1.5) - print information about the encryption keys
* merge (since ORC 2.0.1) - merge multiple ORC files into a single ORC file
* merge (since ORC 2.0.1) - merge multiple ORC files into one or more ORC files
* meta - print the metadata of an ORC file
* scan (since ORC 1.3) - scan the data for benchmarking
* sizes (since ORC 1.7.2) - list size on disk of each column
Expand Down Expand Up @@ -356,13 +356,39 @@ ______________________________________________________________________

## Java Merge

The merge command can merge multiple ORC files that all have the same schema into a single ORC file.
The merge command can merge multiple ORC files that all have the same schema. By default
it writes a single output file. If `--maxSize` is set, `--output` is treated as a directory
and the tool writes multiple part files (`part-00000.orc`, `part-00001.orc`, …) under it.
Input files are grouped using their on-disk sizes so that each part’s total input size
does not exceed the given threshold (a single input file larger than the threshold is still
merged into its own part).

`-h,--help`
: Print help

`-i,--ignoreExtension`
: Include files that do not end in `.orc`

`-m,--maxSize <bytes>`
: Maximum size in bytes for each output part; enables multi-file output under `--output`

`-o,--output <path>`
: Output ORC filename (single-file mode) or output directory (when `--maxSize` is set)

Comment thread
cxzl25 marked this conversation as resolved.
Merge into one ORC file:

~~~ shell
% java -jar orc-tools-X.Y.Z-uber.jar merge --output /path/to/merged.orc /path/to/input_orc/
______________________________________________________________________
~~~

Merge into multiple ORC files under a directory (each part bounded by size):

~~~ shell
% java -jar orc-tools-X.Y.Z-uber.jar merge --output /path/to/out_dir/ --maxSize 1073741824 /path/to/input_orc/
______________________________________________________________________
~~~

## Java Version

The version command prints the version of this ORC tool.
Loading