diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java index eb6466a3a0..4f10b2fd4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.logaggregation; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Timer; import java.util.TimerTask; @@ -57,7 +59,7 @@ public class AggregatedLogDeletionService extends AbstractService { private Timer timer = null; private long checkIntervalMsecs; - private LogDeletionTask task; + private List tasks; public static class LogDeletionTask extends TimerTask { private Configuration conf; @@ -66,14 +68,12 @@ public static class LogDeletionTask extends TimerTask { private Path remoteRootLogDir = null; private ApplicationClientProtocol rmClient = null; - public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClientProtocol rmClient) { + public LogDeletionTask(Configuration conf, long retentionSecs, + ApplicationClientProtocol rmClient, + LogAggregationFileController fileController) { this.conf = conf; this.retentionMillis = retentionSecs * 1000; this.suffix = LogAggregationUtils.getBucketSuffix(); - LogAggregationFileControllerFactory factory = - new LogAggregationFileControllerFactory(conf); - LogAggregationFileController fileController = - factory.getFileControllerForWrite(); this.remoteRootLogDir = fileController.getRemoteRootLogDir(); this.rmClient = rmClient; } @@ -220,7 +220,7 @@ public AggregatedLogDeletionService() { @Override protected void serviceStart() throws Exception { - scheduleLogDeletionTask(); + scheduleLogDeletionTasks(); super.serviceStart(); } @@ -249,13 +249,13 @@ public void refreshLogRetentionSettings() throws IOException { setConfig(conf); stopRMClient(); stopTimer(); - scheduleLogDeletionTask(); + scheduleLogDeletionTasks(); } else { LOG.warn("Failed to execute refreshLogRetentionSettings : Aggregated Log Deletion Service is not started"); } } - private void scheduleLogDeletionTask() throws IOException { + private void scheduleLogDeletionTasks() throws IOException { Configuration conf = getConfig(); if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) { @@ -271,9 +271,28 @@ private void scheduleLogDeletionTask() throws IOException { return; } setLogAggCheckIntervalMsecs(retentionSecs); - task = new LogDeletionTask(conf, retentionSecs, createRMClient()); - timer = new Timer(); - timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs); + + tasks = createLogDeletionTasks(conf, retentionSecs, createRMClient()); + for (LogDeletionTask task : tasks) { + timer = new Timer(); + timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs); + } + } + + @VisibleForTesting + public List createLogDeletionTasks(Configuration conf, long retentionSecs, + ApplicationClientProtocol rmClient) + throws IOException { + List tasks = new ArrayList<>(); + LogAggregationFileControllerFactory factory = new LogAggregationFileControllerFactory(conf); + List fileControllers = + factory.getConfiguredLogAggregationFileControllerList(); + for (LogAggregationFileController fileController : fileControllers) { + LogDeletionTask task = new LogDeletionTask(conf, retentionSecs, rmClient, + fileController); + tasks.add(task); + } + return tasks; } private void stopTimer() { @@ -295,14 +314,18 @@ protected Configuration createConf() { // as @Idempotent, it will automatically take care of RM restart/failover. @VisibleForTesting protected ApplicationClientProtocol createRMClient() throws IOException { - return ClientRMProxy.createRMProxy(getConfig(), - ApplicationClientProtocol.class); + return ClientRMProxy.createRMProxy(getConfig(), ApplicationClientProtocol.class); } @VisibleForTesting protected void stopRMClient() { - if (task != null && task.getRMClient() != null) { - RPC.stopProxy(task.getRMClient()); + for (LogDeletionTask task : tasks) { + if (task != null && task.getRMClient() != null) { + RPC.stopProxy(task.getRMClient()); + //The RMClient instance is the same for all deletion tasks. + //It is enough to close the RM client once + break; + } } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java index 13a9afa84e..285ac43322 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java @@ -42,6 +42,7 @@ import java.util.List; import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT; +import static org.apache.hadoop.yarn.logaggregation.LogAggregationTestUtils.enableFileControllers; import static org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.NO_TIMEOUT; import static org.mockito.Mockito.mock; @@ -118,12 +119,12 @@ public void testDeletion() throws Exception { .withRunningApps(4) .injectExceptionForAppDirDeletion(3) .build() - .setupAndRunDeletionService() + .startDeletionService() .verifyAppDirsDeleted(timeout, 1, 3) .verifyAppDirsNotDeleted(timeout, 2, 4) .verifyAppFileDeleted(4, 1, timeout) .verifyAppFileNotDeleted(4, 2, timeout) - .teardown(); + .teardown(1); } @Test @@ -155,7 +156,7 @@ public void testRefreshLogRetentionSettings() throws Exception { .build(); testcase - .setupAndRunDeletionService() + .startDeletionService() //app1Dir would be deleted since it is done above log retention period .verifyAppDirDeleted(1, 10000L) //app2Dir is not expected to be deleted since it is below the threshold @@ -176,7 +177,8 @@ public void testRefreshLogRetentionSettings() throws Exception { .verifyCheckIntervalMilliSecondsEqualTo(checkIntervalMilliSeconds) //app2Dir should be deleted since it falls above the threshold .verifyAppDirDeleted(2, 10000L) - .teardown(); + //Close expected 2 times: once for refresh and once for stopping + .teardown(2); } @Test @@ -202,7 +204,7 @@ public void testCheckInterval() throws Exception { .withFinishedApps(1) .withRunningApps() .build() - .setupAndRunDeletionService() + .startDeletionService() .verifyAnyPathListedAtLeast(4, 10000L) .verifyAppDirNotDeleted(1, NO_TIMEOUT) // modify the timestamp of the logs and verify if it is picked up quickly @@ -211,7 +213,7 @@ public void testCheckInterval() throws Exception { .changeModTimeOfBucketDir(toDeleteTime) .reinitAllPaths() .verifyAppDirDeleted(1, 10000L) - .teardown(); + .teardown(1); } @Test @@ -241,6 +243,59 @@ public void testRobustLogDeletion() throws Exception { .verifyAppDirDeleted(3, NO_TIMEOUT); } + @Test + public void testDeletionTwoControllers() throws IOException { + long now = System.currentTimeMillis(); + long toDeleteTime = now - (2000 * 1000); + long toKeepTime = now - (1500 * 1000); + + + Configuration conf = setupConfiguration(1800, -1); + enableFileControllers(conf, REMOTE_ROOT_LOG_DIR, ALL_FILE_CONTROLLERS, + ALL_FILE_CONTROLLER_NAMES); + long timeout = 2000L; + LogAggregationTestcaseBuilder.create(conf) + .withRootPath(ROOT) + .withRemoteRootLogPath(REMOTE_ROOT_LOG_DIR) + .withBothFileControllers() + .withUserDir(USER_ME, toKeepTime) + .withSuffixDir(SUFFIX, toDeleteTime) + .withBucketDir(toDeleteTime) + .withApps(//Apps for TFile + Lists.newArrayList( + new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList()), + new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList( + Pair.of(DIR_HOST1, toDeleteTime), + Pair.of(DIR_HOST2, toKeepTime))), + new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList( + Pair.of(DIR_HOST1, toDeleteTime), + Pair.of(DIR_HOST2, toDeleteTime))), + new AppDescriptor(T_FILE, toDeleteTime, Lists.newArrayList( + Pair.of(DIR_HOST1, toDeleteTime), + Pair.of(DIR_HOST2, toKeepTime))), + //Apps for IFile + new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList()), + new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList( + Pair.of(DIR_HOST1, toDeleteTime), + Pair.of(DIR_HOST2, toKeepTime))), + new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList( + Pair.of(DIR_HOST1, toDeleteTime), + Pair.of(DIR_HOST2, toDeleteTime))), + new AppDescriptor(I_FILE, toDeleteTime, Lists.newArrayList( + Pair.of(DIR_HOST1, toDeleteTime), + Pair.of(DIR_HOST2, toKeepTime))))) + .withFinishedApps(1, 2, 3, 5, 6, 7) + .withRunningApps(4, 8) + .injectExceptionForAppDirDeletion(3, 6) + .build() + .startDeletionService() + .verifyAppDirsDeleted(timeout, 1, 3, 5, 7) + .verifyAppDirsNotDeleted(timeout, 2, 4, 6, 8) + .verifyAppFilesDeleted(timeout, Lists.newArrayList(Pair.of(4, 1), Pair.of(8, 1))) + .verifyAppFilesNotDeleted(timeout, Lists.newArrayList(Pair.of(4, 2), Pair.of(8, 2))) + .teardown(1); + } + static class MockFileSystem extends FilterFileSystem { MockFileSystem() { super(mock(FileSystem.class)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/AggregatedLogDeletionServiceForTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/AggregatedLogDeletionServiceForTest.java index 49042cf458..76ec8aab53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/AggregatedLogDeletionServiceForTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/AggregatedLogDeletionServiceForTest.java @@ -32,6 +32,7 @@ public class AggregatedLogDeletionServiceForTest extends AggregatedLogDeletionSe private final List finishedApplications; private final List runningApplications; private final Configuration conf; + private ApplicationClientProtocol mockRMClient; public AggregatedLogDeletionServiceForTest(List runningApplications, List finishedApplications) { @@ -48,11 +49,16 @@ public AggregatedLogDeletionServiceForTest(List runningApplicatio @Override protected ApplicationClientProtocol createRMClient() throws IOException { + if (mockRMClient != null) { + return mockRMClient; + } try { - return createMockRMClient(finishedApplications, runningApplications); + mockRMClient = + createMockRMClient(finishedApplications, runningApplications); } catch (Exception e) { throw new IOException(e); } + return mockRMClient; } @Override @@ -60,8 +66,7 @@ protected Configuration createConf() { return conf; } - @Override - protected void stopRMClient() { - // DO NOTHING + public ApplicationClientProtocol getMockRMClient() { + return mockRMClient; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcase.java index 8f535d4071..f2074f8c8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/LogAggregationTestcase.java @@ -28,10 +28,12 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogDeletionService.LogDeletionTask; import org.apache.hadoop.yarn.logaggregation.testutils.LogAggregationTestcaseBuilder.AppDescriptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -77,6 +79,7 @@ public class LogAggregationTestcase { private List appDirs; private final List appDescriptors; private AggregatedLogDeletionServiceForTest deletionService; + private ApplicationClientProtocol rmClient; public LogAggregationTestcase(LogAggregationTestcaseBuilder builder) throws IOException { conf = builder.conf; @@ -102,6 +105,8 @@ public LogAggregationTestcase(LogAggregationTestcaseBuilder builder) throws IOEx mockFs = ((FilterFileSystem) builder.rootFs).getRawFileSystem(); validateAppControllers(); setupMocks(); + + setupDeletionService(); } private void validateAppControllers() { @@ -241,10 +246,13 @@ private void setupListStatusForPath(PathWithFileStatus dir, FileStatus[] fileSta when(mockFs.listStatus(dir.path)).thenReturn(fileStatuses); } - public LogAggregationTestcase setupAndRunDeletionService() { + private void setupDeletionService() { List finishedApps = createFinishedAppsList(); List runningApps = createRunningAppsList(); deletionService = new AggregatedLogDeletionServiceForTest(runningApps, finishedApps, conf); + } + + public LogAggregationTestcase startDeletionService() { deletionService.init(conf); deletionService.start(); return this; @@ -271,10 +279,13 @@ private List createFinishedAppsList() { public LogAggregationTestcase runDeletionTask(long retentionSeconds) throws Exception { List finishedApps = createFinishedAppsList(); List runningApps = createRunningAppsList(); - ApplicationClientProtocol rmClient = createMockRMClient(finishedApps, runningApps); - AggregatedLogDeletionService.LogDeletionTask deletionTask = - new AggregatedLogDeletionService.LogDeletionTask(conf, retentionSeconds, rmClient); - deletionTask.run(); + rmClient = createMockRMClient(finishedApps, runningApps); + List tasks = deletionService.createLogDeletionTasks(conf, retentionSeconds, + rmClient); + for (LogDeletionTask deletionTask : tasks) { + deletionTask.run(); + } + return this; } @@ -359,8 +370,20 @@ private void verifyAppFileDeletion(int appId, int fileNo, int times, long timeou verify(mockFs, timeout(timeout).times(times)).delete(file.path, true); } - public void teardown() { + private void verifyMockRmClientWasClosedNTimes(int expectedRmClientCloses) + throws IOException { + ApplicationClientProtocol mockRMClient; + if (deletionService != null) { + mockRMClient = deletionService.getMockRMClient(); + } else { + mockRMClient = rmClient; + } + verify((Closeable)mockRMClient, times(expectedRmClientCloses)).close(); + } + + public void teardown(int expectedRmClientCloses) throws IOException { deletionService.stop(); + verifyMockRmClientWasClosedNTimes(expectedRmClientCloses); } public LogAggregationTestcase refreshLogRetentionSettings() throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/MockRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/MockRMClientUtils.java index c3f69c2a67..6eb1eb1ecb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/MockRMClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/testutils/MockRMClientUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.logaggregation.testutils; +import org.apache.hadoop.test.MockitoUtil; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; @@ -34,7 +35,8 @@ public class MockRMClientUtils { public static ApplicationClientProtocol createMockRMClient( List finishedApplications, List runningApplications) throws Exception { - final ApplicationClientProtocol mockProtocol = mock(ApplicationClientProtocol.class); + final ApplicationClientProtocol mockProtocol = + MockitoUtil.mockProtocol(ApplicationClientProtocol.class); if (finishedApplications != null && !finishedApplications.isEmpty()) { for (ApplicationId appId : finishedApplications) { GetApplicationReportRequest request = GetApplicationReportRequest.newInstance(appId);