diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index b4da42d8f03..0130cbbab7e 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -221,6 +221,11 @@ public final class ReconServerConfigKeys { "ozone.recon.dn.metrics.collection.timeout"; public static final String OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT_DEFAULT = "10m"; + public static final String OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT = + "ozone.recon.dn.metrics.collection.thread.count"; + public static final int OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT = + Runtime.getRuntime().availableProcessors() * 2; + /** * Application-level ceiling on the number of ContainerIDs fetched from SCM * per RPC call during container sync. The effective batch size is diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/TarExtractor.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/TarExtractor.java index b3bd17bdece..49f2cdc40a3 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/TarExtractor.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/TarExtractor.java @@ -34,10 +34,9 @@ import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; @@ -73,7 +72,7 @@ public class TarExtractor { public TarExtractor(int threadPoolSize, String threadNamePrefix) { this.threadPoolSize = threadPoolSize; this.threadFactory = - new ThreadFactoryBuilder().setNameFormat("FetchOMDBTar-%d" + threadNamePrefix) + new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "FetchOMDBTar-%d") .build(); } @@ -163,8 +162,7 @@ private void writeFile(Path outputDir, String fileName, byte[] fileData) { public void start() { if (executorServiceStarted.compareAndSet(false, true)) { - this.executor = - new ThreadPoolExecutor(0, threadPoolSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory); + this.executor = Executors.newFixedThreadPool(threadPoolSize, threadFactory); } } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java index 6b3adf302da..4ba64be1a64 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/DataNodeMetricsService.java @@ -19,6 +19,8 @@ import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_MINIMUM_API_DELAY_DEFAULT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT; +import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT; import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT_DEFAULT; @@ -32,9 +34,10 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -61,11 +64,9 @@ public class DataNodeMetricsService { private static final Logger LOG = LoggerFactory.getLogger(DataNodeMetricsService.class); - private static final int MAX_POOL_SIZE = 500; - private static final int KEEP_ALIVE_TIME = 5; private static final int POLL_INTERVAL_MS = 200; - private final ThreadPoolExecutor executorService; + private final ExecutorService executorService; private final ReconNodeManager reconNodeManager; private final boolean httpsEnabled; private final int minimumApiDelayMs; @@ -96,14 +97,15 @@ public DataNodeMetricsService( OZONE_RECON_DN_METRICS_COLLECTION_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); this.metricsServiceProviderFactory = metricsServiceProviderFactory; this.lastCollectionEndTime.set(-minimumApiDelayMs); - int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; - this.executorService = new ThreadPoolExecutor( - corePoolSize, MAX_POOL_SIZE, - KEEP_ALIVE_TIME, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), - new ThreadFactoryBuilder() - .setNameFormat("DataNodeMetricsCollector-%d") - .build()); + int corePoolSize = config.getInt(OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT, + OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT); + corePoolSize = corePoolSize > 0 + ? corePoolSize + : OZONE_RECON_DN_METRICS_COLLECTION_THREAD_COUNT_DEFAULT; + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("DataNodeMetricsCollector-%d") + .build(); + this.executorService = Executors.newFixedThreadPool(corePoolSize, threadFactory); } /** diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java index dca33c759b8..cd62b2160da 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/spi/impl/OzoneManagerServiceProviderImpl.java @@ -229,7 +229,7 @@ public OzoneManagerServiceProviderImpl( new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "SyncOM-%d") .build(); // Number of parallel workers - int omDBTarProcessorThreadCount = Math.max(64, Runtime.getRuntime().availableProcessors()); + int omDBTarProcessorThreadCount = Math.min(64, Runtime.getRuntime().availableProcessors()); this.reconContext = reconContext; this.taskStatusUpdaterManager = taskStatusUpdaterManager; this.omDBLagThreshold = configuration.getLong(RECON_OM_DELTA_UPDATE_LAG_THRESHOLD, diff --git a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/TestTarExtractor.java b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/TestTarExtractor.java new file mode 100644 index 00000000000..231bb653918 --- /dev/null +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/TestTarExtractor.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon; + +import static org.apache.hadoop.ozone.recon.ReconUtils.createTarFile; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import java.io.File; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Tests for {@link TarExtractor}. + */ +public class TestTarExtractor { + + @TempDir + private Path temporaryFolder; + + @Test + public void testExtractTarUsesMultipleThreadsForSingleTar() throws Exception { + Path sourceDir = Files.createDirectory(temporaryFolder.resolve("source")); + int poolSize = 4; + int fileCount = poolSize * 4; + byte[] payload = new byte[4 * 1024 * 1024]; + Arrays.fill(payload, (byte) 'a'); + for (int i = 0; i < fileCount; i++) { + byte[] content = Arrays.copyOf(payload, payload.length + 1); + content[payload.length] = (byte) ('0' + (i % 10)); + Files.write(sourceDir.resolve("file" + i + ".sst"), content); + } + + File tarFile = createTarFile(sourceDir); + Path outputDir = temporaryFolder.resolve("output"); + String threadPrefix = "TestTarExtractorParallel-"; + + TarExtractor extractor = new TarExtractor(poolSize, threadPrefix); + Set writeFileThreads = ConcurrentHashMap.newKeySet(); + AtomicReference extractionError = new AtomicReference<>(); + + extractor.start(); + Thread extractionThread = new Thread(() -> { + try (InputStream input = Files.newInputStream(tarFile.toPath())) { + extractor.extractTar(input, outputDir); + } catch (Throwable t) { + extractionError.set(t); + } + }); + + try { + extractionThread.start(); + + long deadlineNanos = System.nanoTime() + TimeUnit.SECONDS.toNanos(30); + while (extractionThread.isAlive() + && System.nanoTime() < deadlineNanos + && writeFileThreads.size() < poolSize) { + Thread.getAllStackTraces().forEach((thread, stackTrace) -> { + if (thread.getName().startsWith(threadPrefix) + && isInTarExtractorWritePath(stackTrace)) { + writeFileThreads.add(thread.getName()); + } + }); + Thread.sleep(1); + } + + extractionThread.join(TimeUnit.SECONDS.toMillis(30)); + assertFalse(extractionThread.isAlive(), "Tar extraction did not finish in time"); + + if (extractionError.get() != null) { + throw new AssertionError("Tar extraction failed", extractionError.get()); + } + + assertEquals(poolSize, writeFileThreads.size(), + "Expected writeFile to run in parallel across all configured worker threads"); + } finally { + extractor.stop(); + } + } + + private static boolean isInTarExtractorWritePath(StackTraceElement[] stackTrace) { + for (StackTraceElement stackElement : stackTrace) { + if (TarExtractor.class.getName().equals(stackElement.getClassName()) + && ("writeFile".equals(stackElement.getMethodName()) + || "lambda$extractTar$0".equals(stackElement.getMethodName()))) { + return true; + } + } + return false; + } +}