diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundServiceScheduler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundServiceScheduler.java new file mode 100644 index 000000000000..379d936e9d96 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundServiceScheduler.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import org.apache.ratis.util.ReferenceCountedObject; +import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; + +/** + * Utility class to manage a shared background service using a {@link ScheduledExecutorService} + * which is provided with a single-threaded {@link ScheduledThreadPoolExecutor}. + * This class manages the lifecycle and reference counting for the executor + * to ensure proper resource cleanup. + * + * The executor is lazily initialized on the first invocation of the {@code get()} method. + * It is shut down and released when no longer referenced, ensuring efficient use + * of system resources. The shutdown process includes cleaning the reference to the executor. + * + * This class is thread-safe. + */ +final class BackgroundServiceScheduler { + private static ReferenceCountedObject executor; + + private BackgroundServiceScheduler() { + + } + + public static synchronized UncheckedAutoCloseableSupplier get() { + if (executor == null) { + ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1); + executor = ReferenceCountedObject.wrap(scheduler, () -> { }, (shutdown) -> { + if (shutdown) { + synchronized (BackgroundServiceScheduler.class) { + scheduler.shutdown(); + executor = null; + } + } + }); + } + return executor.retainAndReleaseOnClose(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java index aaa14321011e..8652924b52e2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java @@ -37,6 +37,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -258,11 +259,15 @@ private void applyDiskBalancerInfo(DiskBalancerInfo diskBalancerInfo) setStopAfterDiskEven(diskBalancerInfo.isStopAfterDiskEven()); setVersion(diskBalancerInfo.getVersion()); - // Default executorService is ScheduledThreadPoolExecutor, so we can - // update the poll size by setting corePoolSize. - if ((getExecutorService() instanceof ScheduledThreadPoolExecutor)) { - ((ScheduledThreadPoolExecutor) getExecutorService()) + Object executorService = getExecutorService(); + if (executorService instanceof ScheduledThreadPoolExecutor) { + // Update the pool size by setting corePoolSize for ScheduledThreadPoolExecutor + ((ScheduledThreadPoolExecutor) executorService) .setCorePoolSize(parallelThread); + } else if (executorService instanceof ForkJoinPool) { + // For ForkJoinPool, dynamic resizing is not supported and requires service restart + LOG.warn("ForkJoinPool doesn't support dynamic pool size changes. " + + "Service restart is required for pool size change to take effect."); } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/BlockDeletingServiceTestImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/BlockDeletingServiceTestImpl.java index 8e41ab686a2e..73a53e60e381 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/BlockDeletingServiceTestImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/BlockDeletingServiceTestImpl.java @@ -68,7 +68,7 @@ int getTimesOfProcessed() { // Override the implementation to start a single on-call control thread. @Override public void start() { - PeriodicalTask svc = new PeriodicalTask(); + PeriodicalTask svc = new PeriodicalTask(null); // In test mode, relies on a latch countdown to runDeletingTasks tasks. Runnable r = () -> { while (true) { diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java index ef1503219070..c3746a1ba13b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java @@ -65,7 +65,7 @@ public int getTimesOfProcessed() { // Override the implementation to start a single on-call control thread. @Override public void start() { - PeriodicalTask svc = new PeriodicalTask(); + PeriodicalTask svc = new PeriodicalTask(null); // In test mode, relies on a latch countdown to runDeletingTasks tasks. Runnable r = () -> { while (true) { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java index 144d1725fdb5..2cb8db3c655e 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java @@ -18,14 +18,18 @@ package org.apache.hadoop.hdds.utils; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.LinkedList; +import java.util.Queue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.RecursiveAction; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,20 +41,19 @@ */ public abstract class BackgroundService { - protected static final Logger LOG = - LoggerFactory.getLogger(BackgroundService.class); + protected static final Logger LOG = LoggerFactory.getLogger(BackgroundService.class); // Executor to launch child tasks - private ScheduledThreadPoolExecutor exec; + private UncheckedAutoCloseableSupplier periodicTaskScheduler; + private volatile ForkJoinPool exec; private ThreadGroup threadGroup; private final String serviceName; - private long interval; + private volatile long intervalInMillis; private volatile long serviceTimeoutInNanos; - private TimeUnit unit; - private final int threadPoolSize; + private volatile int threadPoolSize; private final String threadNamePrefix; - private final PeriodicalTask service; - private CompletableFuture future; + private volatile CompletableFuture future; + private volatile AtomicReference isShutdown; public BackgroundService(String serviceName, long interval, TimeUnit unit, int threadPoolSize, long serviceTimeout) { @@ -60,15 +63,13 @@ public BackgroundService(String serviceName, long interval, public BackgroundService(String serviceName, long interval, TimeUnit unit, int threadPoolSize, long serviceTimeout, String threadNamePrefix) { - this.interval = interval; - this.unit = unit; + setInterval(interval, unit); this.serviceName = serviceName; this.serviceTimeoutInNanos = TimeDuration.valueOf(serviceTimeout, unit) .toLong(TimeUnit.NANOSECONDS); this.threadPoolSize = threadPoolSize; this.threadNamePrefix = threadNamePrefix; initExecutorAndThreadGroup(); - service = new PeriodicalTask(); this.future = CompletableFuture.completedFuture(null); } @@ -77,22 +78,23 @@ protected CompletableFuture getFuture() { } @VisibleForTesting - public synchronized ExecutorService getExecutorService() { + public synchronized ForkJoinPool getExecutorService() { return this.exec; } - public synchronized void setPoolSize(int size) { + /** + * Set the pool size for background service. This would require a shutdown and restart of the service for the + * change to take effect. + * @param size + */ + public void setPoolSize(int size) { if (size <= 0) { throw new IllegalArgumentException("Pool size must be positive."); } - - // In ScheduledThreadPoolExecutor, maximumPoolSize is Integer.MAX_VALUE - // the corePoolSize will always less maximumPoolSize. - // So we can directly set the corePoolSize - exec.setCorePoolSize(size); + this.threadPoolSize = size; } - public synchronized void setServiceTimeoutInNanos(long newTimeout) { + public void setServiceTimeoutInNanos(long newTimeout) { LOG.info("{} timeout is set to {} {}", serviceName, newTimeout, TimeUnit.NANOSECONDS.name().toLowerCase()); this.serviceTimeoutInNanos = newTimeout; } @@ -104,7 +106,7 @@ public int getThreadCount() { @VisibleForTesting public void runPeriodicalTaskNow() throws Exception { - BackgroundTaskQueue tasks = getTasks(); + BackgroundTaskQueue tasks = getTasks(false); while (!tasks.isEmpty()) { tasks.poll().call(); } @@ -116,18 +118,20 @@ public synchronized void start() { if (exec == null || exec.isShutdown() || exec.isTerminated()) { initExecutorAndThreadGroup(); } - LOG.info("Starting service {} with interval {} {}", serviceName, - interval, unit.name().toLowerCase()); - exec.scheduleWithFixedDelay(service, 0, interval, unit); + LOG.info("Starting service {} with interval {} ms", serviceName, intervalInMillis); + exec.execute(new PeriodicalTask(periodicTaskScheduler.get())); } - protected synchronized void setInterval(long newInterval, TimeUnit newUnit) { - this.interval = newInterval; - this.unit = newUnit; + protected void setInterval(long newInterval, TimeUnit newUnit) { + this.intervalInMillis = TimeDuration.valueOf(newInterval, newUnit).toLong(TimeUnit.MILLISECONDS); } - protected synchronized long getIntervalMillis() { - return this.unit.toMillis(interval); + protected long getIntervalMillis() { + return intervalInMillis; + } + + public BackgroundTaskQueue getTasks(boolean allowTasksToFork) { + return getTasks(); } public abstract BackgroundTaskQueue getTasks(); @@ -138,84 +142,183 @@ protected void execTaskCompletion() { } * Run one or more background tasks concurrently. * Wait until all tasks to return the result. */ - public class PeriodicalTask implements Runnable { - @Override - public void run() { - // wait for previous set of tasks to complete - try { - future.join(); - } catch (RuntimeException e) { - LOG.error("Background service execution failed.", e); - } finally { - execTaskCompletion(); - } + public class PeriodicalTask extends RecursiveAction { + private final Queue tasksInFlight; + private final AtomicReference isShutdown; + private final ScheduledExecutorService scheduledExecuterService; + + public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) { + this.tasksInFlight = new LinkedList<>(); + this.isShutdown = BackgroundService.this.isShutdown; + this.scheduledExecuterService = scheduledExecutorService; + } + + private PeriodicalTask(PeriodicalTask other) { + this.tasksInFlight = other.tasksInFlight; + this.isShutdown = other.isShutdown; + this.scheduledExecuterService = other.scheduledExecuterService; + } + + private boolean performIfNotShutdown(Runnable runnable) { + return isShutdown.updateAndGet((shutdown) -> { + if (!shutdown) { + runnable.run(); + } + return shutdown; + }); + } + private boolean performIfNotShutdown(Consumer consumer, T t) { + return isShutdown.updateAndGet((shutdown) -> { + if (!shutdown) { + consumer.accept(t); + } + return shutdown; + }); + } + + private boolean runTasks() { if (LOG.isDebugEnabled()) { LOG.debug("Running background service : {}", serviceName); } - BackgroundTaskQueue tasks = getTasks(); + if (isShutdown.get()) { + return false; + } + if (!tasksInFlight.isEmpty()) { + LOG.warn("Tasks are still in flight service {}. This should not happen schedule should only begin once all " + + "tasks from schedules have completed execution.", serviceName); + tasksInFlight.clear(); + } + + BackgroundTaskQueue tasks = getTasks(true); if (tasks.isEmpty()) { // No task found, or some problems to init tasks // return and retry in next interval. - return; + return false; } if (LOG.isDebugEnabled()) { LOG.debug("Number of background tasks to execute : {}", tasks.size()); } - synchronized (BackgroundService.this) { - while (!tasks.isEmpty()) { - BackgroundTask task = tasks.poll(); - future = future.thenCombine(CompletableFuture.runAsync(() -> { - long startTime = System.nanoTime(); - try { - BackgroundTaskResult result = task.call(); - if (LOG.isDebugEnabled()) { - LOG.debug("task execution result size {}", result.getSize()); - } - } catch (Throwable e) { - LOG.error("Background task execution failed", e); - if (e instanceof Error) { - throw (Error) e; - } - } finally { - long endTime = System.nanoTime(); - if (endTime - startTime > serviceTimeoutInNanos) { - LOG.warn("{} Background task execution took {}ns > {}ns(timeout)", - serviceName, endTime - startTime, serviceTimeoutInNanos); - } - } - }, exec).exceptionally(e -> null), (Void1, Void) -> null); + Consumer taskForkHandler = task -> { + task.fork(); + tasksInFlight.offer(task); + }; + while (!tasks.isEmpty()) { + BackgroundTask task = tasks.poll(); + // Wrap the task in a ForkJoin wrapper and fork it. + BackgroundTaskForkJoin forkJoinTask = new BackgroundTaskForkJoin(task); + if (performIfNotShutdown(taskForkHandler, forkJoinTask)) { + return false; + } + } + Consumer taskCompletionHandler = task -> { + BackgroundTaskForkJoin.BackgroundTaskForkResult result = task.join(); + // Check for exception first in the task execution. + if (result.getThrowable() != null) { + LOG.error("Background task execution failed", result.getThrowable()); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("task execution result size {}", result.getResult().getSize()); + } + } + if (result.getTotalExecutionTime() > serviceTimeoutInNanos) { + LOG.warn("{} Background task execution took {}ns > {}ns(timeout)", + serviceName, result.getTotalExecutionTime(), serviceTimeoutInNanos); + } + }; + while (!tasksInFlight.isEmpty()) { + BackgroundTaskForkJoin taskInFlight = tasksInFlight.poll(); + // Join the tasks forked before and wait for the result one by one. + if (performIfNotShutdown(taskCompletionHandler, taskInFlight)) { + return false; } } + return true; + } + + private void scheduleNextTask() { + performIfNotShutdown(() -> { + if (scheduledExecuterService != null) { + scheduledExecuterService.schedule(() -> exec.submit(new PeriodicalTask(this)), + intervalInMillis, TimeUnit.MILLISECONDS); + } + }); + } + + @Override + public void compute() { + future = new CompletableFuture<>(); + if (runTasks()) { + scheduleNextTask(); + } else { + LOG.debug("Service {} is shutdown. Cancelling all schedules of all tasks.", serviceName); + } + future.complete(null); } } // shutdown and make sure all threads are properly released. - public synchronized void shutdown() { + public void shutdown() { LOG.info("Shutting down service {}", this.serviceName); - exec.shutdown(); - try { - if (!exec.awaitTermination(60, TimeUnit.SECONDS)) { - exec.shutdownNow(); + final ThreadGroup threadGroupToBeClosed; + final ForkJoinPool execToShutdown; + final UncheckedAutoCloseableSupplier periodicTaskSchedulerToBeClosed; + // Set the shutdown flag to true to prevent new tasks from being submitted. + synchronized (this) { + periodicTaskSchedulerToBeClosed = periodicTaskScheduler; + threadGroupToBeClosed = threadGroup; + execToShutdown = exec; + exec = null; + threadGroup = null; + periodicTaskScheduler = null; + if (isShutdown != null) { + this.isShutdown.set(true); + } + isShutdown = null; + } + if (execToShutdown != null) { + execToShutdown.shutdown(); + try { + if (!execToShutdown.awaitTermination(60, TimeUnit.SECONDS)) { + execToShutdown.shutdownNow(); + } + } catch (InterruptedException e) { + // Re-interrupt the thread while catching InterruptedException + Thread.currentThread().interrupt(); + execToShutdown.shutdownNow(); } - } catch (InterruptedException e) { - // Re-interrupt the thread while catching InterruptedException - Thread.currentThread().interrupt(); - exec.shutdownNow(); } - if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) { - threadGroup.destroy(); + if (periodicTaskSchedulerToBeClosed != null) { + periodicTaskSchedulerToBeClosed.close(); + } + if (threadGroupToBeClosed != null && !threadGroupToBeClosed.isDestroyed()) { + threadGroupToBeClosed.destroy(); } } - private void initExecutorAndThreadGroup() { - threadGroup = new ThreadGroup(serviceName); - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setThreadFactory(r -> new Thread(threadGroup, r)) - .setDaemon(true) - .setNameFormat(threadNamePrefix + serviceName + "#%d") - .build(); - exec = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(threadPoolSize, threadFactory); + private synchronized void initExecutorAndThreadGroup() { + try { + threadGroup = new ThreadGroup(serviceName); + Thread initThread = new Thread(threadGroup, () -> { + ForkJoinPool.ForkJoinWorkerThreadFactory factory = + pool -> { + ForkJoinWorkerThread thread = new ForkJoinWorkerThread(pool) { + }; + thread.setDaemon(true); + thread.setName(threadNamePrefix + serviceName + thread.getPoolIndex()); + return thread; + }; + exec = new ForkJoinPool(threadPoolSize, factory, null, false); + isShutdown = new AtomicReference<>(false); + }); + initThread.start(); + initThread.join(); + periodicTaskScheduler = BackgroundServiceScheduler.get(); + } catch (InterruptedException e) { + shutdown(); + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } } protected String getServiceName() { diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java new file mode 100644 index 000000000000..fe8b3e64ba62 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils; + +import java.util.concurrent.RecursiveTask; + +/** + * A ForkJoin wrapper for {@link BackgroundTask} that enables parallel execution + * in a ForkJoinPool while keeping the BackgroundTask interface simple. + * + *

This wrapper handles the RecursiveTask mechanics, timing, and exception + * handling, allowing BackgroundTask implementations to focus on their business logic. + */ +public class BackgroundTaskForkJoin extends RecursiveTask { + private static final long serialVersionUID = 1L; + private final transient BackgroundTask backgroundTask; + + public BackgroundTaskForkJoin(BackgroundTask backgroundTask) { + this.backgroundTask = backgroundTask; + } + + /** + * Result wrapper containing the task result, execution time, and any exception. + */ + public static final class BackgroundTaskForkResult { + private final BackgroundTaskResult result; + private final Throwable throwable; + private final long startTime; + private final long endTime; + + private BackgroundTaskForkResult(BackgroundTaskResult result, long startTime, long endTime, Throwable throwable) { + this.endTime = endTime; + this.result = result; + this.startTime = startTime; + this.throwable = throwable; + } + + public long getTotalExecutionTime() { + return endTime - startTime; + } + + public BackgroundTaskResult getResult() { + return result; + } + + public Throwable getThrowable() { + return throwable; + } + } + + @Override + protected BackgroundTaskForkResult compute() { + long startTime = System.nanoTime(); + BackgroundTaskResult result = null; + Throwable throwable = null; + try { + result = backgroundTask.call(); + } catch (Throwable e) { + throwable = e; + } + long endTime = System.nanoTime(); + return new BackgroundTaskForkResult(result, startTime, endTime, throwable); + } + + public int getPriority() { + return backgroundTask.getPriority(); + } + + public BackgroundTask getBackgroundTask() { + return backgroundTask; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java index 9dc8332697be..e1a91bdae255 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java @@ -187,7 +187,7 @@ private void markSSTFilteredFlagForSnapshot(SnapshotInfo snapshotInfo) throws IO } @Override - public BackgroundTaskResult call() throws Exception { + public BackgroundTaskResult call() { Optional snapshotManager = Optional.ofNullable(ozoneManager) .map(OzoneManager::getOmSnapshotManager); @@ -255,6 +255,9 @@ public BackgroundTaskResult call() throws Exception { .getSnapshotId()); } } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("SST filtering task interrupted for snapshot: {}", snapShotTableKey, e); } } catch (IOException e) { if (isSnapshotDeleted(snapshotInfoTable.get(snapShotTableKey))) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java index 8e43a9fc8966..000723e1c401 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java @@ -106,6 +106,28 @@ boolean isPreviousPurgeTransactionFlushed() throws IOException { return true; } + private static final class BackgroundDeleteTask implements BackgroundTask { + private final BootstrapStateHandler.Lock bootstrapLock; + private final BackgroundTask task; + + private BackgroundDeleteTask(BootstrapStateHandler.Lock bootstrapLock, BackgroundTask task) { + this.bootstrapLock = bootstrapLock; + this.task = task; + } + + @Override + public BackgroundTaskResult call() throws Exception { + try (UncheckedAutoCloseable readLock = bootstrapLock.acquireReadLock()) { + return task.call(); + } + } + + @Override + public int getPriority() { + return task.getPriority(); + } + } + /** * A specialized implementation of {@link BackgroundTaskQueue} that modifies * the behavior of added tasks to utilize a read lock during execution. @@ -119,20 +141,7 @@ boolean isPreviousPurgeTransactionFlushed() throws IOException { public class DeletingServiceTaskQueue extends BackgroundTaskQueue { @Override public synchronized void add(BackgroundTask task) { - super.add(new BackgroundTask() { - - @Override - public BackgroundTaskResult call() throws Exception { - try (UncheckedAutoCloseable readLock = getBootstrapStateLock().acquireReadLock()) { - return task.call(); - } - } - - @Override - public int getPriority() { - return task.getPriority(); - } - }); + super.add(new BackgroundDeleteTask(lock, task)); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index e99b36269607..9603ab5d82a4 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT; +import static org.apache.hadoop.ozone.om.OmSnapshotManager.areSnapshotChangesFlushedToDB; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; @@ -34,17 +35,15 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Queue; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.RecursiveTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -155,11 +154,10 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService { // Using multi thread for DirDeletion. Multiple threads would read // from parent directory info from deleted directory table concurrently // and send deletion requests. - private int ratisByteLimit; + private final int ratisByteLimit; private final SnapshotChainManager snapshotChainManager; private final boolean deepCleanSnapshots; - private ExecutorService deletionThreadPool; - private final int numberOfParallelThreadsPerStore; + private final int maxForksPerStore; private final AtomicLong deletedDirsCount; private final AtomicLong movedDirsCount; private final AtomicLong movedFilesCount; @@ -174,9 +172,7 @@ public DirectoryDeletingService(long interval, TimeUnit unit, OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, StorageUnit.BYTES); - this.numberOfParallelThreadsPerStore = dirDeletingServiceCorePoolSize; - this.deletionThreadPool = new ThreadPoolExecutor(0, numberOfParallelThreadsPerStore, - interval, unit, new LinkedBlockingDeque<>(Integer.MAX_VALUE)); + this.maxForksPerStore = dirDeletingServiceCorePoolSize; // always go to 90% of max limit for request as other header will be added this.ratisByteLimit = (int) (limit * 0.9); registerReconfigCallbacks(ozoneManager.getReconfigurationHandler()); @@ -214,8 +210,13 @@ private synchronized void updateAndRestart(OzoneConfiguration conf) { @Override public DeletingServiceTaskQueue getTasks() { + return getTasks(false); + } + + @Override + public DeletingServiceTaskQueue getTasks(boolean allowTasksToFork) { DeletingServiceTaskQueue queue = new DeletingServiceTaskQueue(); - queue.add(new DirDeletingTask(null)); + queue.add(new DirDeletingTask(null, allowTasksToFork, this)); if (deepCleanSnapshots) { Iterator iterator = null; try { @@ -226,41 +227,12 @@ public DeletingServiceTaskQueue getTasks() { } while (iterator.hasNext()) { UUID snapshotId = iterator.next(); - queue.add(new DirDeletingTask(snapshotId)); + queue.add(new DirDeletingTask(snapshotId, allowTasksToFork, this)); } } return queue; } - @Override - public void shutdown() { - if (deletionThreadPool != null) { - deletionThreadPool.shutdown(); - try { - if (!deletionThreadPool.awaitTermination(60, TimeUnit.SECONDS)) { - deletionThreadPool.shutdownNow(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - deletionThreadPool.shutdownNow(); - } - } - super.shutdown(); - } - - @Override - public synchronized void start() { - if (deletionThreadPool == null || deletionThreadPool.isShutdown() || deletionThreadPool.isTerminated()) { - this.deletionThreadPool = new ThreadPoolExecutor(0, numberOfParallelThreadsPerStore, - super.getIntervalMillis(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(Integer.MAX_VALUE)); - } - super.start(); - } - - private boolean isThreadPoolActive(ExecutorService threadPoolExecutor) { - return threadPoolExecutor != null && !threadPoolExecutor.isShutdown() && !threadPoolExecutor.isTerminated(); - } - @SuppressWarnings("checkstyle:ParameterNumber") void optimizeDirDeletesAndSubmitRequest( long dirNum, long subDirNum, long subFileNum, @@ -550,11 +522,16 @@ OzoneManagerProtocolProtos.OMResponse submitPurgeRequest(String snapTableKey, } @VisibleForTesting - final class DirDeletingTask implements BackgroundTask { + static final class DirDeletingTask implements BackgroundTask { + private final UUID snapshotId; + private final boolean allowForks; + private final DirectoryDeletingService dds; - DirDeletingTask(UUID snapshotId) { + DirDeletingTask(UUID snapshotId, boolean allowForks, DirectoryDeletingService dds) { this.snapshotId = snapshotId; + this.allowForks = allowForks; + this.dds = dds; } @Override @@ -569,19 +546,33 @@ private OzoneManagerProtocolProtos.SetSnapshotPropertyRequest getSetSnapshotRequ .setExclusiveReplicatedSize(exclusiveReplicatedSize) .build(); return OzoneManagerProtocolProtos.SetSnapshotPropertyRequest.newBuilder() - .setSnapshotKey(snapshotChainManager.getTableKey(snapshotID)) + .setSnapshotKey(dds.snapshotChainManager.getTableKey(snapshotID)) .setSnapshotSizeDeltaFromDirDeepCleaning(snapshotSize) .build(); } /** + * Processes deleted directories for the given snapshot and store with optional + * parallelization depending on the configuration. Updates snapshot-related + * properties such as exclusive size mappings and deep clean flags upon successful + * processing. * - * @param currentSnapshotInfo if null, deleted directories in AOS should be processed. - * @param keyManager KeyManager of the underlying store. + * @param currentSnapshotInfo Information about the current snapshot whose deleted + * directories are being processed. Null if processing + * for the global store. + * @param keyManager Key manager responsible for handling operations related to + * deleted directories and keys. + * @param rnCnt Run count indicating the number of iterations the task has been + * executed. + * @param remainNum Remaining number of directories to process in this task. + * @throws IOException If an I/O error occurs during the processing of deleted + * directories. + * @throws ExecutionException If an error occurs while executing the task in parallel. + * @throws InterruptedException If the task execution is interrupted. */ @VisibleForTesting - void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager keyManager, long rnCnt, int remainNum) - throws IOException, ExecutionException, InterruptedException { + void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager keyManager, long rnCnt, + int remainNum) throws IOException, ExecutionException, InterruptedException { String volume, bucket; String snapshotTableKey; if (currentSnapshotInfo != null) { volume = currentSnapshotInfo.getVolumeName(); @@ -596,26 +587,54 @@ void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager key // This is to avoid race condition b/w purge request and snapshot chain update. For AOS taking the global // snapshotId since AOS could process multiple buckets in one iteration. While using path // previous snapshotId for a snapshot since it would process only one bucket. + SnapshotChainManager snapshotChainManager = dds.snapshotChainManager; UUID expectedPreviousSnapshotId = currentSnapshotInfo == null ? snapshotChainManager.getLatestGlobalSnapshotId() : SnapshotUtils.getPreviousSnapshotId(currentSnapshotInfo, snapshotChainManager); Map> exclusiveSizeMap = Maps.newConcurrentMap(); - CompletableFuture processedAllDeletedDirs = CompletableFuture.completedFuture(true); - for (int i = 0; i < numberOfParallelThreadsPerStore; i++) { - CompletableFuture future = CompletableFuture.supplyAsync(() -> { - try { - return processDeletedDirectories(currentSnapshotInfo, keyManager, dirSupplier, - expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt, remainNum); - } catch (Throwable e) { - return false; - } - }, isThreadPoolActive(deletionThreadPool) ? deletionThreadPool : ForkJoinPool.commonPool()); - processedAllDeletedDirs = processedAllDeletedDirs.thenCombine(future, (a, b) -> a && b); + boolean processedAllDeletedDirs; + int maxForksPerStore = dds.maxForksPerStore; + // If allowed to fork, create multiple tasks to process deleted directories tasks in parallel. + if (allowForks) { + Queue> recursiveTasks = new LinkedList<>(); + processedAllDeletedDirs = true; + for (int i = 0; i < maxForksPerStore; i++) { + RecursiveTask task = new RecursiveTask() { + private static final long serialVersionUID = 1L; + private final transient SnapshotInfo snapshotInfo = currentSnapshotInfo; + private final transient DeletedDirSupplier deletedDirSupplier = dirSupplier; + private final transient KeyManager km = keyManager; + + @Override + protected Boolean compute() { + try { + return processDeletedDirectories(snapshotInfo, km, deletedDirSupplier, + expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt, remainNum); + } catch (Throwable e) { + return false; + } + } + }; + task.fork(); + recursiveTasks.offer(task); + } + while (!recursiveTasks.isEmpty()) { + processedAllDeletedDirs &= recursiveTasks.poll().join(); + } + } else { + try { + // Execute the same sequentially in the same thread. + processedAllDeletedDirs = processDeletedDirectories(currentSnapshotInfo, keyManager, dirSupplier, + expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt, + (int) Math.min(Integer.MAX_VALUE, (long)remainNum * maxForksPerStore)); + } catch (Throwable e) { + processedAllDeletedDirs = false; + } } // If AOS or all directories have been processed for snapshot, update snapshot size delta and deep clean flag // if it is a snapshot. - if (processedAllDeletedDirs.get()) { + if (processedAllDeletedDirs) { List setSnapshotPropertyRequests = new ArrayList<>(); for (Map.Entry> entry : exclusiveSizeMap.entrySet()) { @@ -633,7 +652,7 @@ void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager key .setDeepCleanedDeletedDir(true) .build()); } - submitSetSnapshotRequests(setSnapshotPropertyRequests); + dds.submitSetSnapshotRequests(setSnapshotPropertyRequests); } } } @@ -656,13 +675,13 @@ void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager key private boolean processDeletedDirectories(SnapshotInfo currentSnapshotInfo, KeyManager keyManager, DeletedDirSupplier dirSupplier, UUID expectedPreviousSnapshotId, Map> totalExclusiveSizeMap, long runCount, int remaining) { - OmSnapshotManager omSnapshotManager = getOzoneManager().getOmSnapshotManager(); - IOzoneManagerLock lock = getOzoneManager().getMetadataManager().getLock(); + OmSnapshotManager omSnapshotManager = dds.getOzoneManager().getOmSnapshotManager(); + IOzoneManagerLock lock = dds.getOzoneManager().getMetadataManager().getLock(); String snapshotTableKey = currentSnapshotInfo == null ? null : currentSnapshotInfo.getTableKey(); - try (ReclaimableDirFilter reclaimableDirFilter = new ReclaimableDirFilter(getOzoneManager(), - omSnapshotManager, snapshotChainManager, currentSnapshotInfo, keyManager, lock); - ReclaimableKeyFilter reclaimableFileFilter = new ReclaimableKeyFilter(getOzoneManager(), - omSnapshotManager, snapshotChainManager, currentSnapshotInfo, keyManager, lock)) { + try (ReclaimableDirFilter reclaimableDirFilter = new ReclaimableDirFilter(dds.getOzoneManager(), + omSnapshotManager, dds.snapshotChainManager, currentSnapshotInfo, keyManager, lock); + ReclaimableKeyFilter reclaimableFileFilter = new ReclaimableKeyFilter(dds.getOzoneManager(), + omSnapshotManager, dds.snapshotChainManager, currentSnapshotInfo, keyManager, lock)) { long startTime = Time.monotonicNow(); long dirNum = 0L; long subDirNum = 0L; @@ -688,10 +707,10 @@ private boolean processDeletedDirectories(SnapshotInfo currentSnapshotInfo, KeyM .build()); boolean isDirReclaimable = reclaimableDirFilter.apply(pendingDeletedDirInfo); - Optional request = prepareDeleteDirRequest( + Optional request = dds.prepareDeleteDirRequest( pendingDeletedDirInfo.getValue(), pendingDeletedDirInfo.getKey(), isDirReclaimable, allSubDirList, - getOzoneManager().getKeyManager(), reclaimableFileFilter, remainNum); + dds.getOzoneManager().getKeyManager(), reclaimableFileFilter, remainNum); if (!request.isPresent()) { continue; } @@ -705,9 +724,9 @@ private boolean processDeletedDirectories(SnapshotInfo currentSnapshotInfo, KeyM subFileNum += purgePathRequest.getDeletedSubFilesCount(); } - optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum, + dds.optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum, subFileNum, allSubDirList, purgePathRequestList, snapshotTableKey, - startTime, getOzoneManager().getKeyManager(), + startTime, dds.getOzoneManager().getKeyManager(), reclaimableDirFilter, reclaimableFileFilter, bucketNameInfos, expectedPreviousSnapshotId, runCount, remainNum); Map exclusiveReplicatedSizeMap = reclaimableFileFilter.getExclusiveReplicatedSizeMap(); @@ -738,38 +757,38 @@ startTime, getOzoneManager().getKeyManager(), public BackgroundTaskResult call() { // Check if this is the Leader OM. If not leader, no need to execute this // task. - if (shouldRun()) { - final long run = getRunCount().incrementAndGet(); + if (dds.shouldRun()) { + final long run = dds.getRunCount().incrementAndGet(); if (snapshotId == null) { LOG.debug("Running DirectoryDeletingService for active object store, {}", run); } else { LOG.debug("Running DirectoryDeletingService for snapshot : {}, {}", snapshotId, run); } - OmSnapshotManager omSnapshotManager = getOzoneManager().getOmSnapshotManager(); + OmSnapshotManager omSnapshotManager = dds.getOzoneManager().getOmSnapshotManager(); SnapshotInfo snapInfo = null; try { snapInfo = snapshotId == null ? null : - SnapshotUtils.getSnapshotInfo(getOzoneManager(), snapshotChainManager, snapshotId); + SnapshotUtils.getSnapshotInfo(dds.getOzoneManager(), dds.snapshotChainManager, snapshotId); if (snapInfo != null) { if (snapInfo.isDeepCleanedDeletedDir()) { LOG.info("Snapshot {} has already been deep cleaned directory. Skipping the snapshot in this iteration.", snapInfo.getSnapshotId()); return BackgroundTaskResult.EmptyTaskResult.newResult(); } - if (!OmSnapshotManager.areSnapshotChangesFlushedToDB(getOzoneManager().getMetadataManager(), snapInfo)) { + if (!areSnapshotChangesFlushedToDB(dds.getOzoneManager().getMetadataManager(), snapInfo)) { LOG.info("Skipping snapshot processing since changes to snapshot {} have not been flushed to disk", snapInfo); return BackgroundTaskResult.EmptyTaskResult.newResult(); } - } else if (!isPreviousPurgeTransactionFlushed()) { + } else if (!dds.isPreviousPurgeTransactionFlushed()) { return BackgroundTaskResult.EmptyTaskResult.newResult(); } try (UncheckedAutoCloseableSupplier omSnapshot = snapInfo == null ? null : omSnapshotManager.getActiveSnapshot(snapInfo.getVolumeName(), snapInfo.getBucketName(), snapInfo.getName())) { - KeyManager keyManager = snapInfo == null ? getOzoneManager().getKeyManager() + KeyManager keyManager = snapInfo == null ? dds.getOzoneManager().getKeyManager() : omSnapshot.get().getKeyManager(); - processDeletedDirsForStore(snapInfo, keyManager, run, pathLimitPerTask); + processDeletedDirsForStore(snapInfo, keyManager, run, dds.pathLimitPerTask); } } catch (IOException | ExecutionException e) { LOG.error("Error while running delete files background task for store {}. Will retry at next run.", diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java index f3c91b6f4d88..4a6af86014fc 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java @@ -57,6 +57,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; +import org.apache.hadoop.ozone.om.service.DirectoryDeletingService.DirDeletingTask; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.util.Time; import org.apache.ozone.test.GenericTestUtils; @@ -226,8 +227,8 @@ public void testMultithreadedDirectoryDeletion() throws Exception { return future; }); ozoneManager.getKeyManager().getDirDeletingService().suspend(); - DirectoryDeletingService.DirDeletingTask dirDeletingTask = - ozoneManager.getKeyManager().getDirDeletingService().new DirDeletingTask(null); + DirDeletingTask dirDeletingTask = new DirDeletingTask(null, false, + ozoneManager.getKeyManager().getDirDeletingService()); dirDeletingTask.processDeletedDirsForStore(null, ozoneManager.getKeyManager(), 1, 6000); assertThat(futureList).hasSize(threadCount);