diff --git a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java index a8863046f6ee..17b4b7946a2d 100644 --- a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java +++ b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java @@ -222,7 +222,7 @@ private Response getContainerEndpointResponse(long containerId) { null, // ContainerHealthSchemaManager - not needed for this test recon.getReconServer().getReconNamespaceSummaryManager(), recon.getReconServer().getReconContainerMetadataManager(), - omMetadataManagerInstance); + omMetadataManagerInstance, null); return containerEndpoint.getKeysForContainer(containerId, 10, ""); } diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java index 9a9dfb48e74b..2827ff6cc86e 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java @@ -41,6 +41,7 @@ import org.apache.hadoop.ozone.om.protocolPB.OmTransport; import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB; +import org.apache.hadoop.ozone.recon.api.ExportJobManager; import org.apache.hadoop.ozone.recon.heatmap.HeatMapServiceImpl; import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager; import org.apache.hadoop.ozone.recon.persistence.DataSourceConfiguration; @@ -110,6 +111,7 @@ protected void configure() { bind(OMMetadataManager.class).to(ReconOmMetadataManagerImpl.class); bind(ContainerHealthSchemaManager.class).in(Singleton.class); + bind(ExportJobManager.class).in(Singleton.class); bind(ReconContainerMetadataManager.class) .to(ReconContainerMetadataManagerImpl.class).in(Singleton.class); bind(ReconFileMetadataManager.class) diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java index b4da42d8f03a..38b382c0c8de 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java @@ -253,6 +253,40 @@ public final class ReconServerConfigKeys { "ozone.recon.scm.container.id.batch.size"; public static final long OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT = 1_000_000; + /** + * JDBC fetch size for CSV exports. + * Default: 10,000 rows per fetch + */ + public static final String OZONE_RECON_UNHEALTHY_CONTAINER_FETCH_SIZE = + "ozone.recon.unhealthy.container.fetch.size"; + public static final int OZONE_RECON_UNHEALTHY_CONTAINER_FETCH_SIZE_DEFAULT = 10_000; + + /** + * Worker thread pool size for async CSV exports. + * Single-threaded to avoid concurrent database access. + * Default: 1 + */ + public static final String OZONE_RECON_EXPORT_WORKER_THREADS = + "ozone.recon.export.worker.threads"; + public static final int OZONE_RECON_EXPORT_WORKER_THREADS_DEFAULT = 1; + + /** + * Max export jobs in queue (global limit). + * Jobs beyond this limit will be rejected. + * Default: 10 + */ + public static final String OZONE_RECON_EXPORT_MAX_JOBS_TOTAL = + "ozone.recon.export.max.jobs.total"; + public static final int OZONE_RECON_EXPORT_MAX_JOBS_TOTAL_DEFAULT = 10; + + /** + * Directory to store export CSV files. + * Default: /tmp/recon/exports + */ + public static final String OZONE_RECON_EXPORT_DIRECTORY = + "ozone.recon.export.directory"; + public static final String OZONE_RECON_EXPORT_DIRECTORY_DEFAULT = "/tmp/recon/exports"; + /** * Private constructor for utility class. */ diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java index 4cf6ca85f6f7..73f14f0b9093 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java @@ -26,6 +26,9 @@ import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_MIN_CONTAINER_ID; import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_PREVKEY; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.io.UncheckedIOException; import java.time.Instant; @@ -39,8 +42,10 @@ import java.util.UUID; import java.util.stream.Collectors; import javax.inject.Inject; +import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; @@ -48,6 +53,7 @@ import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.ContainerID; @@ -67,6 +73,7 @@ import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata; import org.apache.hadoop.ozone.recon.api.types.ContainersResponse; import org.apache.hadoop.ozone.recon.api.types.DeletedContainerInfo; +import org.apache.hadoop.ozone.recon.api.types.ExportJob; import org.apache.hadoop.ozone.recon.api.types.KeyMetadata; import org.apache.hadoop.ozone.recon.api.types.KeyMetadata.ContainerBlockMetadata; import org.apache.hadoop.ozone.recon.api.types.KeysResponse; @@ -75,6 +82,7 @@ import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainerMetadata; import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersResponse; import org.apache.hadoop.ozone.recon.api.types.UnhealthyContainersSummary; +import org.apache.hadoop.ozone.recon.api.ExportJobManager; import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager; import org.apache.hadoop.ozone.recon.persistence.ContainerHistory; import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; @@ -104,6 +112,7 @@ public class ContainerEndpoint { private final ContainerHealthSchemaManager containerHealthSchemaManager; private final ReconNamespaceSummaryManager reconNamespaceSummaryManager; private final OzoneStorageContainerManager reconSCM; + private final ExportJobManager exportJobManager; private static final Logger LOG = LoggerFactory.getLogger(ContainerEndpoint.class); private BucketLayout layout = BucketLayout.DEFAULT; @@ -145,7 +154,8 @@ public ContainerEndpoint(OzoneStorageContainerManager reconSCM, ContainerHealthSchemaManager containerHealthSchemaManager, ReconNamespaceSummaryManager reconNamespaceSummaryManager, ReconContainerMetadataManager reconContainerMetadataManager, - ReconOMMetadataManager omMetadataManager) { + ReconOMMetadataManager omMetadataManager, + ExportJobManager exportJobManager) { this.containerManager = (ReconContainerManager) reconSCM.getContainerManager(); this.pipelineManager = reconSCM.getPipelineManager(); @@ -154,6 +164,7 @@ public ContainerEndpoint(OzoneStorageContainerManager reconSCM, this.reconSCM = reconSCM; this.reconContainerMetadataManager = reconContainerMetadataManager; this.omMetadataManager = omMetadataManager; + this.exportJobManager = exportJobManager; } /** @@ -502,6 +513,152 @@ public Response getUnhealthyContainers( minContainerId); } + /** + * List all export jobs tracked by the server (any status). + * + * @return Response containing a list of ExportJob objects + */ + @GET + @Path("/unhealthy/export") + @Produces(MediaType.APPLICATION_JSON) + public Response listExportJobs() { + List jobs = exportJobManager.getAllJobs(); + for (ExportJob job : jobs) { + if (job.getStatus() == ExportJob.JobStatus.QUEUED) { + job.setQueuePosition(exportJobManager.getQueuePosition(job.getJobId())); + } + } + return Response.ok(jobs).build(); + } + + /** + * Start an async CSV export job for unhealthy containers. + * Returns immediately with a job ID that the client can poll. + * + * @param state The container state (required: MISSING, UNDER_REPLICATED, etc.) + * @param userId User ID for rate limiting (defaults to "anonymous") + * @return Response containing ExportJob with jobId + */ + @POST + @Path("/unhealthy/export") + @Produces(MediaType.APPLICATION_JSON) + public Response startExport( + @QueryParam("state") String state, + @DefaultValue("anonymous") @QueryParam("userId") String userId) { + + if (StringUtils.isEmpty(state)) { + throw new WebApplicationException("state query parameter is required", + Response.Status.BAD_REQUEST); + } + + // Validate state parameter + try { + ContainerSchemaDefinition.UnHealthyContainerStates.valueOf(state); + } catch (IllegalArgumentException e) { + throw new WebApplicationException("Invalid state: " + state, Response.Status.BAD_REQUEST); + } + + try { + String jobId = exportJobManager.submitJob(userId, state, -1, 0); + ExportJob job = exportJobManager.getJob(jobId); + return Response.ok(job).build(); + } catch (IllegalStateException e) { + // Return JSON error response instead of HTML + Map errorResponse = new HashMap<>(); + errorResponse.put("error", "Too Many Requests"); + errorResponse.put("message", e.getMessage()); + return Response.status(Response.Status.TOO_MANY_REQUESTS) + .entity(errorResponse) + .type(MediaType.APPLICATION_JSON) + .build(); + } + } + + /** + * Get the status of an export job. + * + * @param jobId The job ID returned by startExport + * @return Response containing the ExportJob with current status/progress + */ + @GET + @Path("/unhealthy/export/{jobId}") + @Produces(MediaType.APPLICATION_JSON) + public Response getExportStatus(@PathParam("jobId") String jobId) { + ExportJob job = exportJobManager.getJob(jobId); + if (job == null) { + throw new WebApplicationException("Job not found", Response.Status.NOT_FOUND); + } + + // Calculate and set queue position if QUEUED + if (job.getStatus() == ExportJob.JobStatus.QUEUED) { + int position = exportJobManager.getQueuePosition(jobId); + job.setQueuePosition(position); + } + + return Response.ok(job).build(); + } + + /** + * Download a completed export TAR file. + * + * @param jobId The job ID + * @return Response with TAR file stream + */ + @GET + @Path("/unhealthy/export/{jobId}/download") + @Produces("application/x-tar") + public Response downloadExport(@PathParam("jobId") String jobId) { + ExportJob job = exportJobManager.getJob(jobId); + if (job == null) { + throw new WebApplicationException("Job not found", Response.Status.NOT_FOUND); + } + if (job.getStatus() != ExportJob.JobStatus.COMPLETED) { + throw new WebApplicationException("Job not completed yet", Response.Status.CONFLICT); + } + + File file = new File(job.getFilePath()); + if (!file.exists()) { + throw new WebApplicationException("Export file not found", Response.Status.NOT_FOUND); + } + + StreamingOutput stream = outputStream -> { + try (FileInputStream fis = new FileInputStream(file); + BufferedOutputStream bos = new BufferedOutputStream(outputStream, 256 * 1024)) { + byte[] buffer = new byte[8192]; + int bytesRead; + while ((bytesRead = fis.read(buffer)) != -1) { + bos.write(buffer, 0, bytesRead); + } + bos.flush(); + } + }; + + // Extract filename from full path (e.g., /tmp/recon/exports/export_missing_webui_d5854cc2.tar) + String filename = job.getFilePath().substring(job.getFilePath().lastIndexOf('/') + 1); + + return Response.ok(stream) + .header("Content-Disposition", "attachment; filename=\"" + filename + "\"") + .header("Content-Type", "application/x-tar") + .build(); + } + + /** + * Cancel a running export job. + * + * @param jobId The job ID + * @return Response with 200 if successful + */ + @DELETE + @Path("/unhealthy/export/{jobId}") + public Response cancelExport(@PathParam("jobId") String jobId) { + try { + exportJobManager.cancelJob(jobId); + return Response.ok().build(); + } catch (IllegalStateException e) { + throw new WebApplicationException(e.getMessage(), Response.Status.NOT_FOUND); + } + } + /** * This API will return all DELETED containers in SCM in below JSON format. * { diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ExportJobManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ExportJobManager.java new file mode 100644 index 000000000000..30eb36684986 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ExportJobManager.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.Archiver; +import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; +import org.apache.hadoop.ozone.recon.api.types.ExportJob; +import org.apache.hadoop.ozone.recon.api.types.ExportJob.JobStatus; +import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager; +import org.apache.ozone.recon.schema.ContainerSchemaDefinition; +import org.apache.ozone.recon.schema.generated.tables.records.UnhealthyContainersRecord; +import org.jooq.Cursor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages asynchronous CSV export jobs. + */ +@Singleton +public class ExportJobManager { + private static final Logger LOG = LoggerFactory.getLogger(ExportJobManager.class); + private static final int MAX_QUEUE_SIZE = 4; + + private final Map jobTracker = new ConcurrentHashMap<>(); + private final LinkedHashMap jobQueue = new LinkedHashMap<>(); + private final Map> runningTasks = new ConcurrentHashMap<>(); + private final ExecutorService workerPool; + private final ContainerHealthSchemaManager containerHealthSchemaManager; + private final String exportDirectory; + + @Inject + public ExportJobManager(ContainerHealthSchemaManager containerHealthSchemaManager, + OzoneConfiguration conf) { + this.containerHealthSchemaManager = containerHealthSchemaManager; + + // Use single thread executor for sequential processing (no concurrent DB access) + this.workerPool = Executors.newSingleThreadExecutor(); + + this.exportDirectory = conf.get( + ReconServerConfigKeys.OZONE_RECON_EXPORT_DIRECTORY, + ReconServerConfigKeys.OZONE_RECON_EXPORT_DIRECTORY_DEFAULT); + + // Create export directory if it doesn't exist + try { + Files.createDirectories(Paths.get(exportDirectory)); + } catch (IOException e) { + LOG.error("Failed to create export directory: {}", exportDirectory, e); + } + + LOG.info("ExportJobManager initialized with single-threaded queue (max {} jobs)", MAX_QUEUE_SIZE); + } + + public synchronized String submitJob(String userId, String state, int limit, long prevKey) { + // Check queue size limit + synchronized (jobQueue) { + if (jobQueue.size() >= MAX_QUEUE_SIZE) { + throw new IllegalStateException( + "Export queue is full (max " + MAX_QUEUE_SIZE + " jobs). Please try again later."); + } + } + + String jobId = UUID.randomUUID().toString(); + ExportJob job = new ExportJob(jobId, userId, state, limit, prevKey); + // Filename format: export_{state}_{userId}_{shortJobId}.tar + String shortJobId = jobId.substring(0, 8); + String filePath = exportDirectory + "/export_" + state.toLowerCase() + "_" + userId + "_" + shortJobId + ".tar"; + job.setFilePath(filePath); + + jobTracker.put(jobId, job); + + // Add to queue (LinkedHashMap maintains insertion order) + synchronized (jobQueue) { + jobQueue.put(jobId, job); + } + + // Submit to single-threaded worker pool + Future future = workerPool.submit(() -> executeExport(job)); + runningTasks.put(jobId, future); + + int queuePosition = getQueuePosition(jobId); + LOG.info("Submitted export job {} for user {} (state={}, limit={}, queue position={})", + jobId, userId, state, limit, queuePosition); + + return jobId; + } + + public ExportJob getJob(String jobId) { + return jobTracker.get(jobId); + } + + /** + * Returns all tracked export jobs (any status). + */ + public List getAllJobs() { + return new ArrayList<>(jobTracker.values()); + } + + /** + * Get the queue position for a job (1-indexed). + * Returns 0 if job is not in queue (running, completed, or not found). + */ + public int getQueuePosition(String jobId) { + synchronized (jobQueue) { + if (!jobQueue.containsKey(jobId)) { + return 0; + } + + int position = 1; + for (String id : jobQueue.keySet()) { + if (id.equals(jobId)) { + return position; + } + position++; + } + return 0; + } + } + + public void cancelJob(String jobId) { + ExportJob job = jobTracker.get(jobId); + if (job == null) { + throw new IllegalStateException("Job not found: " + jobId); + } + + if (job.getStatus() == JobStatus.COMPLETED || job.getStatus() == JobStatus.FAILED) { + throw new IllegalStateException("Job already completed or failed"); + } + + // Remove from queue if still queued + synchronized (jobQueue) { + jobQueue.remove(jobId); + } + + Future future = runningTasks.get(jobId); + if (future != null) { + future.cancel(true); + runningTasks.remove(jobId); + } + + job.setStatus(JobStatus.FAILED); + job.setErrorMessage("Cancelled by user"); + + // Delete partial files/directory + deleteDirectory(Paths.get(exportDirectory + "/" + jobId)); + deleteFileQuietly(job.getFilePath()); + + LOG.info("Cancelled export job {}", jobId); + } + + private void executeExport(ExportJob job) { + String jobDirectory = exportDirectory + "/" + job.getJobId(); + Path jobDir = Paths.get(jobDirectory); + String tarFilePath = job.getFilePath(); // Use the filename set in submitJob + + try { + // Create job-specific directory for CSV files + Files.createDirectories(jobDir); + + // Remove from queue and mark as running + synchronized (jobQueue) { + jobQueue.remove(job.getJobId()); + } + job.setStatus(JobStatus.RUNNING); + LOG.info("Starting export job {}", job.getJobId()); + + ContainerSchemaDefinition.UnHealthyContainerStates internalState = + ContainerSchemaDefinition.UnHealthyContainerStates.valueOf(job.getState()); + + // Get total count first for progress tracking + long estimatedTotal = containerHealthSchemaManager.getUnhealthyContainersCount( + internalState, job.getLimit(), job.getPrevKey()); + job.setEstimatedTotal(estimatedTotal); + LOG.info("Export job {} will process approximately {} records", job.getJobId(), estimatedTotal); + + // Open database cursor + try (Cursor cursor = + containerHealthSchemaManager.getUnhealthyContainersCursor( + internalState, job.getLimit(), job.getPrevKey())) { + + int fileIndex = 1; + long totalRecords = 0; + long recordsInCurrentFile = 0; + final int CHUNK_SIZE = 500_000; + + BufferedWriter writer = null; + FileOutputStream fos = null; + + try { + while (cursor.hasNext()) { + // Check for cancellation + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Job cancelled"); + } + + // Start new CSV file if needed + if (recordsInCurrentFile == 0) { + // Close previous file if exists + if (writer != null) { + writer.flush(); + writer.close(); + } + + String csvFileName = String.format("%s/unhealthy_containers_%s_part%03d.csv", + jobDirectory, job.getState().toLowerCase(), fileIndex); + fos = new FileOutputStream(csvFileName); + writer = new BufferedWriter(new OutputStreamWriter(fos, StandardCharsets.UTF_8)); + + // Write CSV header + writer.write("container_id,container_state,in_state_since," + + "expected_replica_count,actual_replica_count,replica_delta\n"); + + LOG.info("Created CSV file: part{}", fileIndex); + } + + // Fetch and write record + UnhealthyContainersRecord rec = cursor.fetchNext(); + StringBuilder sb = new StringBuilder(128); + sb.append(rec.getContainerId()).append(',') + .append(rec.getContainerState()).append(',') + .append(rec.getInStateSince()).append(',') + .append(rec.getExpectedReplicaCount()).append(',') + .append(rec.getActualReplicaCount()).append(',') + .append(rec.getReplicaDelta()).append('\n'); + writer.write(sb.toString()); + + totalRecords++; + recordsInCurrentFile++; + job.setTotalRecords(totalRecords); + + // Move to next file if chunk limit reached + if (recordsInCurrentFile >= CHUNK_SIZE) { + writer.flush(); + writer.close(); + writer = null; + recordsInCurrentFile = 0; + fileIndex++; + } + + // Flush every 10K rows + if (recordsInCurrentFile > 0 && recordsInCurrentFile % 10000 == 0) { + writer.flush(); + } + } + + // Close last file + if (writer != null) { + writer.flush(); + writer.close(); + } + + } finally { + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + LOG.warn("Error closing writer", e); + } + } + } + + LOG.info("Export job {} wrote {} records across {} files", + job.getJobId(), totalRecords, fileIndex); + + // Create TAR archive + File tarFile = new File(tarFilePath); + Archiver.create(tarFile, jobDir); + LOG.info("Created TAR archive: {}", tarFilePath); + + // Delete CSV files and job directory + deleteDirectory(jobDir); + LOG.info("Deleted temporary CSV files for job {}", job.getJobId()); + + // Update job with TAR file path + job.setFilePath(tarFilePath); + job.setStatus(JobStatus.COMPLETED); + LOG.info("Completed export job {} ({} records)", job.getJobId(), totalRecords); + + } catch (InterruptedException e) { + job.setStatus(JobStatus.FAILED); + job.setErrorMessage("Job was cancelled"); + deleteDirectory(jobDir); + deleteFileQuietly(tarFilePath); + LOG.info("Export job {} was cancelled", job.getJobId()); + Thread.currentThread().interrupt(); + } + + } catch (Exception e) { + job.setStatus(JobStatus.FAILED); + job.setErrorMessage(e.getMessage()); + deleteDirectory(jobDir); + deleteFileQuietly(tarFilePath); + LOG.error("Export job {} failed", job.getJobId(), e); + } finally { + // 3-second cooldown before the next queued job is picked up by the single worker thread. + try { + Thread.sleep(3000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + runningTasks.remove(job.getJobId()); + } + } + + private void deleteDirectory(Path directory) { + try { + if (Files.exists(directory)) { + Files.walk(directory) + .sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } + } catch (IOException e) { + LOG.warn("Failed to delete directory: {}", directory, e); + } + } + + private void deleteFileQuietly(String filePath) { + try { + Files.deleteIfExists(Paths.get(filePath)); + } catch (IOException e) { + LOG.warn("Failed to delete file: {}", filePath, e); + } + } + + @PreDestroy + public void shutdown() { + LOG.info("Shutting down ExportJobManager"); + workerPool.shutdownNow(); + try { + workerPool.awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("Timeout waiting for executor shutdown", e); + Thread.currentThread().interrupt(); + } + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ExportJob.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ExportJob.java new file mode 100644 index 000000000000..f1bba5ce8fe4 --- /dev/null +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ExportJob.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.api.types; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Represents an asynchronous CSV export job. + */ +public class ExportJob { + + public enum JobStatus { + QUEUED, // Waiting for worker thread + RUNNING, // Actively exporting + COMPLETED, // File ready for download + FAILED // Error occurred + } + + @JsonProperty("jobId") + private String jobId; + + @JsonProperty("userId") + private String userId; + + @JsonProperty("state") + private String state; + + @JsonProperty("limit") + private int limit; + + @JsonProperty("prevKey") + private long prevKey; + + @JsonProperty("status") + private JobStatus status; + + @JsonProperty("submittedAt") + private long submittedAt; + + @JsonProperty("startedAt") + private long startedAt; + + @JsonProperty("completedAt") + private long completedAt; + + @JsonProperty("totalRecords") + private long totalRecords; + + @JsonProperty("estimatedTotal") + private long estimatedTotal; + + @JsonProperty("filePath") + private String filePath; + + @JsonProperty("errorMessage") + private String errorMessage; + + @JsonProperty("progressPercent") + private int progressPercent; + + @JsonProperty("queuePosition") + private int queuePosition; + + public ExportJob(String jobId, String userId, String state, int limit, long prevKey) { + this.jobId = jobId; + this.userId = userId; + this.state = state; + this.limit = limit; + this.prevKey = prevKey; + this.status = JobStatus.QUEUED; + this.submittedAt = System.currentTimeMillis(); + this.totalRecords = 0; + this.estimatedTotal = -1; + } + + public String getJobId() { + return jobId; + } + + public String getUserId() { + return userId; + } + + public String getState() { + return state; + } + + public int getLimit() { + return limit; + } + + public long getPrevKey() { + return prevKey; + } + + public JobStatus getStatus() { + return status; + } + + public void setStatus(JobStatus status) { + this.status = status; + if (status == JobStatus.RUNNING && startedAt == 0) { + startedAt = System.currentTimeMillis(); + } else if ((status == JobStatus.COMPLETED || status == JobStatus.FAILED) && completedAt == 0) { + completedAt = System.currentTimeMillis(); + } + } + + public long getSubmittedAt() { + return submittedAt; + } + + public long getStartedAt() { + return startedAt; + } + + public long getCompletedAt() { + return completedAt; + } + + public long getTotalRecords() { + return totalRecords; + } + + public void setTotalRecords(long totalRecords) { + this.totalRecords = totalRecords; + } + + public void incrementTotalRecords() { + this.totalRecords++; + } + + public long getEstimatedTotal() { + return estimatedTotal; + } + + public void setEstimatedTotal(long estimatedTotal) { + this.estimatedTotal = estimatedTotal; + } + + public String getFilePath() { + return filePath; + } + + public void setFilePath(String filePath) { + this.filePath = filePath; + } + + public String getErrorMessage() { + return errorMessage; + } + + public void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + + public int getProgressPercent() { + if (estimatedTotal > 0 && totalRecords > 0) { + return (int) ((totalRecords * 100) / estimatedTotal); + } + return 0; + } + + public int getQueuePosition() { + return queuePosition; + } + + public void setQueuePosition(int queuePosition) { + this.queuePosition = queuePosition; + } +} diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java index ac1e91350cc6..48f5734fbc32 100644 --- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java +++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java @@ -36,6 +36,7 @@ import org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates; import org.apache.ozone.recon.schema.generated.tables.records.UnhealthyContainersRecord; import org.jooq.Condition; +import org.jooq.Cursor; import org.jooq.DSLContext; import org.jooq.OrderField; import org.jooq.Record; @@ -395,6 +396,82 @@ public void clearAllUnhealthyContainerRecords() { } } + /** + * Returns a streaming cursor over unhealthy container records for a given state. + * Caller MUST close the cursor. + * + * Generated SQL example (50,000 MISSING containers, starting after container ID 12345): + * + * SELECT * FROM unhealthy_containers + * WHERE container_state = 'MISSING' + * AND container_id > 12345 + * ORDER BY container_id ASC + * LIMIT 50000 + * + * @param state filter by state (required) + * @param limit max records to return, -1 = unlimited + * @param prevKey previous container ID to skip, for cursor-based pagination + * @return Cursor returning UnhealthyContainersRecord + */ + /** + * Get the total count of unhealthy containers for a given state. + * + * @param state The container health state to filter by + * @param limit Maximum number of records to count (-1 for unlimited) + * @param prevKey Container ID offset for cursor-based pagination + * @return Total count of matching containers + */ + public long getUnhealthyContainersCount( + UnHealthyContainerStates state, int limit, long prevKey) { + DSLContext dslContext = containerSchemaDefinition.getDSLContext(); + + Condition whereCondition = UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString()); + + if (prevKey > 0) { + whereCondition = whereCondition.and(UNHEALTHY_CONTAINERS.CONTAINER_ID.gt(prevKey)); + } + + long totalCount = dslContext.selectCount() + .from(UNHEALTHY_CONTAINERS) + .where(whereCondition) + .fetchOne(0, long.class); + + // If limit is set and less than total, return the limit as estimated total + if (limit > 0 && limit < totalCount) { + return limit; + } + + return totalCount; + } + + public Cursor getUnhealthyContainersCursor( + UnHealthyContainerStates state, int limit, long prevKey) { + DSLContext dslContext = containerSchemaDefinition.getDSLContext(); + SelectQuery query = dslContext.selectFrom(UNHEALTHY_CONTAINERS).getQuery(); + + // WHERE container_state = ? + query.addConditions(UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString())); + + if (prevKey > 0) { + // AND container_id > ? (cursor-based pagination) + query.addConditions(UNHEALTHY_CONTAINERS.CONTAINER_ID.gt(prevKey)); + } + + // ORDER BY container_id ASC — matches composite index (state, container_id), + // so Derby walks it in order with no sort step. + query.addOrderBy(UNHEALTHY_CONTAINERS.CONTAINER_ID.asc()); + + if (limit > 0) { + query.addLimit(limit); + } + + // Controls how many rows Derby returns per JDBC round-trip. + // Default is 10,000 rows. + query.fetchSize(10000); + + return query.fetchLazy(); + } + /** * POJO representing a record in UNHEALTHY_CONTAINERS table. */ diff --git a/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/containers/containers.tsx b/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/containers/containers.tsx index 2b1ca3d24994..405975c8e70e 100644 --- a/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/containers/containers.tsx +++ b/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/containers/containers.tsx @@ -16,10 +16,23 @@ * limitations under the License. */ -import React, { useState, useEffect } from "react"; +import React, { useState, useEffect, useRef } from "react"; import moment from "moment"; -import { Card, Row, Tabs } from "antd"; +import { + Button, + Card, + message, + Progress, + Row, + Select, + Table, + Tag, + Tabs, + Tooltip, +} from "antd"; +import { DownloadOutlined, ExportOutlined } from "@ant-design/icons"; import { ValueType } from "react-select/src/types"; +import { ColumnsType } from "antd/es/table"; import Search from "@/v2/components/search/search"; import MultiSelect, { Option } from "@/v2/components/select/multiSelect"; @@ -35,6 +48,7 @@ import { ContainersPaginationResponse, ContainerState, ExpandedRow, + ExportJob, TabPaginationState, } from "@/v2/types/container.types"; import { ClusterStateResponse } from "@/v2/types/overview.types"; @@ -52,6 +66,14 @@ const TAB_STATE_MAP: Record = { '5': 'REPLICA_MISMATCH', }; +const EXPORT_STATE_OPTIONS = [ + { label: 'Missing', value: 'MISSING' }, + { label: 'Under-Replicated', value: 'UNDER_REPLICATED' }, + { label: 'Over-Replicated', value: 'OVER_REPLICATED' }, + { label: 'Mis-Replicated', value: 'MIS_REPLICATED' }, + { label: 'Replica Mismatch', value: 'REPLICA_MISMATCH' }, +]; + const SearchableColumnOpts = [{ label: 'Container ID', value: 'containerID' @@ -75,6 +97,8 @@ const DEFAULT_TAB_STATE: TabPaginationState = { hasNextPage: false, }; +const POLL_INTERVAL_MS = 3000; + const Containers: React.FC<{}> = () => { const [state, setState] = useState({ lastUpdated: 0, @@ -100,6 +124,12 @@ const Containers: React.FC<{}> = () => { const [selectedTab, setSelectedTab] = useState('1'); const [searchColumn, setSearchColumn] = useState<'containerID' | 'pipelineID'>('containerID'); + // Export tab state + const [exportJobs, setExportJobs] = useState([]); + const [selectedExportState, setSelectedExportState] = useState('MISSING'); + const [exportSubmitting, setExportSubmitting] = useState(false); + const pollTimerRef = useRef | null>(null); + const debouncedSearch = useDebounce(searchTerm, 300); const clusterState = useApiData( @@ -121,17 +151,111 @@ const Containers: React.FC<{}> = () => { } }, [clusterState.data]); - // Fetch a single page for a tab using cursor-based pagination. - // minContainerId=0 means "start from the beginning". - // currentPageSize is passed explicitly so callers (e.g. size-change handler) can - // provide the new value before React state has updated. + // ── Polling ────────────────────────────────────────────────────────────── + const fetchExportJobs = async () => { + try { + const jobs = await fetchData( + '/api/v1/containers/unhealthy/export' + ); + setExportJobs(jobs ?? []); + // Stop polling when no active jobs remain + const hasActive = (jobs ?? []).some( + j => j.status === 'QUEUED' || j.status === 'RUNNING' + ); + if (!hasActive && pollTimerRef.current) { + clearInterval(pollTimerRef.current); + pollTimerRef.current = null; + } + } catch (err) { + // Silent — polling errors shouldn't break the UI + } + }; + + const startPolling = () => { + if (pollTimerRef.current) return; // already polling + fetchExportJobs(); // immediate fetch + pollTimerRef.current = setInterval(fetchExportJobs, POLL_INTERVAL_MS); + }; + + // Start polling when Export tab is active; stop when leaving if no active jobs. + useEffect(() => { + if (selectedTab === '6') { + startPolling(); + } else { + const hasActive = exportJobs.some( + j => j.status === 'QUEUED' || j.status === 'RUNNING' + ); + if (!hasActive && pollTimerRef.current) { + clearInterval(pollTimerRef.current); + pollTimerRef.current = null; + } + } + return () => { + // Do NOT clear on unmount if active jobs exist; React StrictMode + // can remount, so we guard with hasActive inside the interval callback. + }; + }, [selectedTab]); // eslint-disable-line react-hooks/exhaustive-deps + + // Clear on component unmount + useEffect(() => { + return () => { + if (pollTimerRef.current) { + clearInterval(pollTimerRef.current); + } + }; + }, []); + + // ── Export submit ───────────────────────────────────────────────────────── + const handleSubmitExport = async () => { + setExportSubmitting(true); + try { + const response = await fetch( + `/api/v1/containers/unhealthy/export?state=${selectedExportState}&userId=webui`, + { method: 'POST' } + ); + if (!response.ok) { + let errorMsg = `Failed to start export (HTTP ${response.status})`; + try { + const body = await response.json(); + errorMsg = body.message || body.error || errorMsg; + } catch { + const text = await response.text(); + if (text && !text.includes('')) errorMsg = text; + } + // Use a longer duration for queue-full errors so the user has time to read it + const duration = response.status === 429 ? 6 : 4; + message.error({ content: errorMsg, duration }); + return; + } + await fetchExportJobs(); + startPolling(); + message.success({ content: 'Export job submitted. Track progress in the table below.', duration: 3 }); + } catch (err: any) { + message.error({ content: `Export failed: ${err.message || err}`, duration: 4 }); + } finally { + setExportSubmitting(false); + } + }; + + // ── Download helper ─────────────────────────────────────────────────────── + const downloadFile = (jobId: string, filePath: string) => { + const filename = filePath ? filePath.split('/').pop() : `${jobId}.tar`; + const link = document.createElement('a'); + link.href = `/api/v1/containers/unhealthy/export/${jobId}/download`; + link.download = filename ?? `${jobId}.tar`; + document.body.appendChild(link); + link.click(); + document.body.removeChild(link); + }; + + // ── Container data fetching ─────────────────────────────────────────────── const fetchTabData = async ( tabKey: string, minContainerId: number, currentPageSize: number ) => { const containerStateName = TAB_STATE_MAP[tabKey]; - // Fetch one extra item so we can detect a next page without a separate count request. + if (!containerStateName) return; // skip Export tab (key='6') or unknown keys const fetchSize = currentPageSize + 1; setTabStates(prev => ({ @@ -147,12 +271,8 @@ const Containers: React.FC<{}> = () => { ); const allContainers = response.containers ?? []; - // If we received more than currentPageSize items, a next page exists. const hasNextPage = allContainers.length > currentPageSize; - // Always display at most currentPageSize rows. const containers = allContainers.slice(0, currentPageSize); - // Derive cursor keys from the visible slice, not the full response, - // so the next-page request starts exactly after the last displayed row. const lastKey = containers.length > 0 ? Math.max(...containers.map(c => c.containerID)) : 0; @@ -173,7 +293,6 @@ const Containers: React.FC<{}> = () => { }, })); - // Summary counts are returned by every tab endpoint. setState(prev => ({ ...prev, missingCount: response.missingCount ?? 0, @@ -192,7 +311,6 @@ const Containers: React.FC<{}> = () => { } }; - // Initial fetch on mount. useEffect(() => { fetchTabData('1', 0, DEFAULT_PAGE_SIZE); }, []); // eslint-disable-line react-hooks/exhaustive-deps @@ -203,8 +321,7 @@ const Containers: React.FC<{}> = () => { function handleTabChange(key: string) { setSelectedTab(key); - // Lazy-load: fetch first page only if the tab has never been loaded. - if (tabStates[key].data.length === 0 && !tabStates[key].loading) { + if (key !== '6' && tabStates[key]?.data.length === 0 && !tabStates[key]?.loading) { fetchTabData(key, 0, pageSize); } } @@ -212,8 +329,6 @@ const Containers: React.FC<{}> = () => { function handleNextPage(tabKey: string) { const tab = tabStates[tabKey]; if (tab.loading || !tab.hasNextPage) return; - - // Push the current minContainerId so we can navigate back. setTabStates(prev => ({ ...prev, [tabKey]: { @@ -227,10 +342,8 @@ const Containers: React.FC<{}> = () => { function handlePrevPage(tabKey: string) { const tab = tabStates[tabKey]; if (tab.loading || tab.pageHistory.length === 0) return; - const history = [...tab.pageHistory]; const prevMinContainerId = history.pop() ?? 0; - setTabStates(prev => ({ ...prev, [tabKey]: { ...prev[tabKey], pageHistory: history }, @@ -238,7 +351,6 @@ const Containers: React.FC<{}> = () => { fetchTabData(tabKey, prevMinContainerId, pageSize); } - // Changing page size resets all tabs and re-fetches the active tab from page 1. function handlePageSizeChange(newSize: number) { setPageSize(newSize); const reset = { @@ -252,7 +364,6 @@ const Containers: React.FC<{}> = () => { fetchTabData(selectedTab, 0, newSize); } - // Full refresh: reset all tab states and re-fetch the active tab from page 1. const loadContainersData = () => { setTabStates({ '1': { ...DEFAULT_TAB_STATE }, @@ -278,14 +389,132 @@ const Containers: React.FC<{}> = () => { replicaMismatchCount, } = state; - const currentTabState = tabStates[selectedTab]; + const currentTabState = tabStates[selectedTab] ?? DEFAULT_TAB_STATE; + + // ── Export jobs table helpers ───────────────────────────────────────────── + const activeJobs = exportJobs.filter(j => j.status === 'RUNNING' || j.status === 'QUEUED'); + const completedJobs = exportJobs.filter(j => j.status === 'COMPLETED' || j.status === 'FAILED'); + + const statusColor: Record = { + QUEUED: 'blue', + RUNNING: 'processing', + COMPLETED: 'green', + FAILED: 'red', + }; + + const jobIdColumn: ColumnsType[0] = { + title: 'Job ID', + dataIndex: 'jobId', + key: 'jobId', + width: 110, + render: (id: string) => ( + + {id.substring(0, 8)} + + ), + }; + + const stateColumn: ColumnsType[0] = { + title: 'State', + dataIndex: 'state', + key: 'state', + render: (s: string) => s.replace(/_/g, ' '), + }; + + const statusColumn: ColumnsType[0] = { + title: 'Status', + dataIndex: 'status', + key: 'status', + width: 120, + render: (status: string) => ( + {status} + ), + }; + + // ── Active exports columns (RUNNING / QUEUED) ───────────────────────────── + const activeExportColumns: ColumnsType = [ + jobIdColumn, + stateColumn, + statusColumn, + { + title: 'Queue Position', + dataIndex: 'queuePosition', + key: 'queuePosition', + width: 130, + render: (_: number, record: ExportJob) => + record.status === 'QUEUED' && record.queuePosition > 0 + ? `#${record.queuePosition}` + : '—', + }, + { + title: 'Progress', + key: 'progress', + render: (_: unknown, record: ExportJob) => { + if (record.status === 'RUNNING') { + const pct = record.progressPercent || 0; + const processed = record.totalRecords?.toLocaleString() ?? '0'; + const total = record.estimatedTotal > 0 + ? record.estimatedTotal.toLocaleString() + : '?'; + return ( +
+ +
+ {processed} / {total} records +
+
+ ); + } + return '—'; + }, + }, + ]; + + // ── Completed exports columns (COMPLETED / FAILED) ──────────────────────── + const completedExportColumns: ColumnsType = [ + jobIdColumn, + stateColumn, + statusColumn, + { + title: 'Records', + dataIndex: 'totalRecords', + key: 'totalRecords', + render: (n: number, record: ExportJob) => + record.status === 'COMPLETED' ? (n?.toLocaleString() ?? '—') : '—', + }, + { + title: 'Action', + key: 'action', + render: (_: unknown, record: ExportJob) => { + if (record.status === 'COMPLETED' && record.filePath) { + const filename = record.filePath.split('/').pop() ?? `${record.jobId}.tar`; + return ( + + ); + } + if (record.status === 'FAILED') { + return ( + + + {record.errorMessage ?? 'Failed'} + + + ); + } + return null; + }, + }, + ]; + // ── Highlights ──────────────────────────────────────────────────────────── const highlightData = ( -
+
Total Containers
{totalContainers ?? 'N/A'} @@ -329,44 +558,44 @@ const Containers: React.FC<{}> = () => { - - {highlightData} - + {highlightData}
-
-
- { }} - columnLength={columnOptions.length} /> -
- ) => setSearchTerm(e.target.value) - } - onChange={(value) => { - setSearchTerm(''); - setSearchColumn(value as 'containerID' | 'pipelineID'); - }} /> -
- handleTabChange(activeKey)}> + + {/* ── Container data tabs ───────────────────────────────────── */} {(['1','2','3','4','5'] as const).map((key) => ( +
+
+ {}} + columnLength={columnOptions.length} /> +
+ ) => setSearchTerm(e.target.value) + } + onChange={(value) => { + setSearchTerm(''); + setSearchColumn(value as 'containerID' | 'pipelineID'); + }} /> +
= () => { />
))} + + {/* ── Export tab ────────────────────────────────────────────── */} + + + Export + + }> +
+ Container State: +