diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 4413e9d81c..debd4d9d34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -93,7 +93,7 @@ public class RMAppManager implements EventHandler, private int maxCompletedAppsInMemory; private int maxCompletedAppsInStateStore; protected int completedAppsInStateStore = 0; - protected LinkedList completedApps = new LinkedList<>(); + private LinkedList completedApps = new LinkedList<>(); private final RMContext rmContext; private final ApplicationMasterService masterService; @@ -316,72 +316,31 @@ protected void writeAuditLog(ApplicationId appId) { * check to see if hit the limit for max # completed apps kept */ protected synchronized void checkAppNumCompletedLimit() { - if (completedAppsInStateStore > maxCompletedAppsInStateStore) { - removeCompletedAppsFromStateStore(); - } - - if (completedApps.size() > maxCompletedAppsInMemory) { - removeCompletedAppsFromMemory(); - } - } - - private void removeCompletedAppsFromStateStore() { - int numDelete = completedAppsInStateStore - maxCompletedAppsInStateStore; - for (int i = 0; i < numDelete; i++) { - ApplicationId removeId = completedApps.get(i); + // check apps kept in state store. + while (completedAppsInStateStore > this.maxCompletedAppsInStateStore) { + ApplicationId removeId = + completedApps.get(completedApps.size() - completedAppsInStateStore); RMApp removeApp = rmContext.getRMApps().get(removeId); - boolean deleteApp = shouldDeleteApp(removeApp); - - if (deleteApp) { - LOG.info("Max number of completed apps kept in state store met:" - + " maxCompletedAppsInStateStore = " - + maxCompletedAppsInStateStore + ", removing app " + removeId - + " from state store."); - rmContext.getStateStore().removeApplication(removeApp); - completedAppsInStateStore--; - } else { - LOG.info("Max number of completed apps kept in state store met:" - + " maxCompletedAppsInStateStore = " - + maxCompletedAppsInStateStore + ", but not removing app " - + removeId - + " from state store as log aggregation have not finished yet."); - } + LOG.info("Max number of completed apps kept in state store met:" + + " maxCompletedAppsInStateStore = " + maxCompletedAppsInStateStore + + ", removing app " + removeApp.getApplicationId() + + " from state store."); + rmContext.getStateStore().removeApplication(removeApp); + completedAppsInStateStore--; } - } - private void removeCompletedAppsFromMemory() { - int numDelete = completedApps.size() - maxCompletedAppsInMemory; - int offset = 0; - for (int i = 0; i < numDelete; i++) { - int deletionIdx = i - offset; - ApplicationId removeId = completedApps.get(deletionIdx); - RMApp removeApp = rmContext.getRMApps().get(removeId); - boolean deleteApp = shouldDeleteApp(removeApp); - - if (deleteApp) { - ++offset; - LOG.info("Application should be expired, max number of completed apps" - + " kept in memory met: maxCompletedAppsInMemory = " - + this.maxCompletedAppsInMemory + ", removing app " + removeId - + " from memory: "); - completedApps.remove(deletionIdx); - rmContext.getRMApps().remove(removeId); - this.applicationACLsManager.removeApplication(removeId); - } else { - LOG.info("Application should be expired, max number of completed apps" - + " kept in memory met: maxCompletedAppsInMemory = " - + this.maxCompletedAppsInMemory + ", but not removing app " - + removeId - + " from memory as log aggregation have not finished yet."); - } + // check apps kept in memory. + while (completedApps.size() > this.maxCompletedAppsInMemory) { + ApplicationId removeId = completedApps.remove(); + LOG.info("Application should be expired, max number of completed apps" + + " kept in memory met: maxCompletedAppsInMemory = " + + this.maxCompletedAppsInMemory + ", removing app " + removeId + + " from memory: "); + rmContext.getRMApps().remove(removeId); + this.applicationACLsManager.removeApplication(removeId); } } - private boolean shouldDeleteApp(RMApp app) { - return !app.isLogAggregationEnabled() - || app.isLogAggregationFinished(); - } - @SuppressWarnings("unchecked") protected void submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java index 535888cfcb..d8323f5497 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java @@ -243,10 +243,6 @@ ApplicationReport createAndGetApplicationReport(String clientUserName, */ int getMaxAppAttempts(); - boolean isLogAggregationEnabled(); - - boolean isLogAggregationFinished(); - /** * Returns the application type * @return the application type. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 83188671f8..c64434117b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -1765,16 +1765,6 @@ public void aggregateLogReport(NodeId nodeId, LogAggregationReport report) { logAggregation.aggregateLogReport(nodeId, report, this); } - @Override - public boolean isLogAggregationFinished() { - return logAggregation.isFinished(); - } - - @Override - public boolean isLogAggregationEnabled() { - return logAggregation.isEnabled(); - } - public String getLogAggregationFailureMessagesForNM(NodeId nodeId) { return logAggregation.getLogAggregationFailureMessagesForNM(nodeId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java index 63b4e44f0e..33fe028245 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java @@ -18,24 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager; -import static java.util.stream.Collectors.toSet; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.List; -import java.util.Set; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; -import org.mockito.ArgumentCaptor; /** * Base class for AppManager related test. @@ -76,28 +67,6 @@ public int getNumberOfCompletedAppsInStateStore() { return this.completedAppsInStateStore; } - public List getCompletedApps() { - return completedApps; - } - - public Set getFirstNCompletedApps(int n) { - return getCompletedApps().stream().limit(n).collect(toSet()); - } - - public Set getCompletedAppsWithEvenIdsInRange(int n) { - return getCompletedApps().stream().limit(n) - .filter(app -> app.getId() % 2 == 0).collect(toSet()); - } - - public Set getRemovedAppsFromStateStore(int numRemoves) { - ArgumentCaptor argumentCaptor = - ArgumentCaptor.forClass(RMApp.class); - verify(stateStore, times(numRemoves)) - .removeApplication(argumentCaptor.capture()); - return argumentCaptor.getAllValues().stream().map(RMApp::getApplicationId) - .collect(toSet()); - } - public void submitApplication( ApplicationSubmissionContext submissionContext, String user) throws YarnException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 77eb7cb8cb..5f5c3f2b0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -89,7 +89,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -143,52 +142,12 @@ private static List newRMApps(int n, long time, RMAppState state) { return list; } - private static List newRMAppsMixedLogAggregationStatus(int n, - long time, RMAppState state) { - List list = Lists.newArrayList(); - for (int i = 0; i < n; ++i) { - MockRMApp rmApp = new MockRMApp(i, time, state); - rmApp.setLogAggregationEnabled(true); - rmApp.setLogAggregationFinished(i % 2 == 0); - list.add(rmApp); - } - return list; - } - public RMContext mockRMContext(int n, long time) { - final ConcurrentMap map = createRMAppsMap(n, time); - return createMockRMContextInternal(map); - } - - public RMContext mockRMContextWithMixedLogAggregationStatus(int n, - long time) { - final ConcurrentMap map = - createRMAppsMapMixedLogAggStatus(n, time); - return createMockRMContextInternal(map); - } - - private ConcurrentMap createRMAppsMap(int n, - long time) { final List apps = newRMApps(n, time, RMAppState.FINISHED); final ConcurrentMap map = Maps.newConcurrentMap(); for (RMApp app : apps) { map.put(app.getApplicationId(), app); } - return map; - } - - private ConcurrentMap createRMAppsMapMixedLogAggStatus( - int n, long time) { - final List apps = - newRMAppsMixedLogAggregationStatus(n, time, RMAppState.FINISHED); - final ConcurrentMap map = Maps.newConcurrentMap(); - for (RMApp app : apps) { - map.put(app.getApplicationId(), app); - } - return map; - } - - private RMContext createMockRMContextInternal(ConcurrentMap map) { Dispatcher rmDispatcher = new AsyncDispatcher(); ContainerAllocationExpirer containerAllocationExpirer = new ContainerAllocationExpirer( rmDispatcher); @@ -240,12 +199,8 @@ public void handle(RMAppEvent event) { } } - private void addToCompletedApps(TestRMAppManager appMonitor, - RMContext rmContext) { - // ensure applications are finished in order by their IDs - List sortedApps = new ArrayList<>(rmContext.getRMApps().values()); - sortedApps.sort(Comparator.comparingInt(o -> o.getApplicationId().getId())); - for (RMApp app : sortedApps) { + protected void addToCompletedApps(TestRMAppManager appMonitor, RMContext rmContext) { + for (RMApp app : rmContext.getRMApps().values()) { if (app.getState() == RMAppState.FINISHED || app.getState() == RMAppState.KILLED || app.getState() == RMAppState.FAILED) { @@ -654,32 +609,18 @@ public void testStateStoreAppLimitLessThanMemoryAppLimit() { addToCompletedApps(appMonitor, rmContext); Assert.assertEquals("Number of completed apps incorrect", allApps, appMonitor.getCompletedAppsListSize()); - - int numRemoveAppsFromStateStore = allApps - maxAppsInStateStore; - Set appsShouldBeRemovedFromStateStore = appMonitor - .getFirstNCompletedApps(numRemoveAppsFromStateStore); appMonitor.checkAppNumCompletedLimit(); - Set removedAppsFromStateStore = appMonitor - .getRemovedAppsFromStateStore(numRemoveAppsFromStateStore); - Assert.assertEquals("Number of apps incorrect after # completed check", maxAppsInMemory, rmContext.getRMApps().size()); Assert.assertEquals("Number of completed apps incorrect after check", maxAppsInMemory, appMonitor.getCompletedAppsListSize()); + int numRemoveAppsFromStateStore = 10 - maxAppsInStateStore; verify(rmContext.getStateStore(), times(numRemoveAppsFromStateStore)) .removeApplication(isA(RMApp.class)); Assert.assertEquals(maxAppsInStateStore, appMonitor.getNumberOfCompletedAppsInStateStore()); - - List completedApps = appMonitor.getCompletedApps(); - Assert.assertEquals(maxAppsInMemory, completedApps.size()); - Assert.assertEquals(numRemoveAppsFromStateStore, - removedAppsFromStateStore.size()); - Assert.assertEquals(numRemoveAppsFromStateStore, - Sets.intersection(appsShouldBeRemovedFromStateStore, - removedAppsFromStateStore).size()); } @Test @@ -697,12 +638,9 @@ public void testStateStoreAppLimitGreaterThanMemoryAppLimit() { addToCompletedApps(appMonitor, rmContext); Assert.assertEquals("Number of completed apps incorrect", allApps, appMonitor.getCompletedAppsListSize()); - - int numRemoveApps = allApps - maxAppsInMemory; - Set appsShouldBeRemoved = appMonitor - .getFirstNCompletedApps(numRemoveApps); appMonitor.checkAppNumCompletedLimit(); + int numRemoveApps = allApps - maxAppsInMemory; Assert.assertEquals("Number of apps incorrect after # completed check", maxAppsInMemory, rmContext.getRMApps().size()); Assert.assertEquals("Number of completed apps incorrect after check", @@ -711,56 +649,6 @@ public void testStateStoreAppLimitGreaterThanMemoryAppLimit() { isA(RMApp.class)); Assert.assertEquals(maxAppsInMemory, appMonitor.getNumberOfCompletedAppsInStateStore()); - - List completedApps = appMonitor.getCompletedApps(); - Assert.assertEquals(maxAppsInMemory, completedApps.size()); - Assert.assertEquals(numRemoveApps, appsShouldBeRemoved.size()); - assertTrue(Collections.disjoint(completedApps, appsShouldBeRemoved)); - } - - @Test - public void testStateStoreAppLimitSomeAppsHaveNotFinishedLogAggregation() { - long now = System.currentTimeMillis(); - final int allApps = 10; - RMContext rmContext = - mockRMContextWithMixedLogAggregationStatus(allApps, now - 20000); - Configuration conf = new YarnConfiguration(); - int maxAppsInMemory = 2; - conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, - maxAppsInMemory); - // greater than maxCompletedAppsInMemory, reset to - // RM_MAX_COMPLETED_APPLICATIONS. - conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, - 1000); - TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf); - - addToCompletedApps(appMonitor, rmContext); - Assert.assertEquals("Number of completed apps incorrect", allApps, - appMonitor.getCompletedAppsListSize()); - - int numRemoveApps = allApps - maxAppsInMemory; - int effectiveNumRemoveApps = numRemoveApps / 2; - //only apps with even ID would be deleted due to log aggregation status - int expectedNumberOfAppsInMemory = maxAppsInMemory + effectiveNumRemoveApps; - - Set appsShouldBeRemoved = appMonitor - .getCompletedAppsWithEvenIdsInRange(numRemoveApps); - appMonitor.checkAppNumCompletedLimit(); - - Assert.assertEquals("Number of apps incorrect after # completed check", - expectedNumberOfAppsInMemory, rmContext.getRMApps().size()); - Assert.assertEquals("Number of completed apps incorrect after check", - expectedNumberOfAppsInMemory, appMonitor.getCompletedAppsListSize()); - verify(rmContext.getStateStore(), times(effectiveNumRemoveApps)) - .removeApplication(isA(RMApp.class)); - Assert.assertEquals(expectedNumberOfAppsInMemory, - appMonitor.getNumberOfCompletedAppsInStateStore()); - - List completedApps = appMonitor.getCompletedApps(); - - Assert.assertEquals(expectedNumberOfAppsInMemory, completedApps.size()); - Assert.assertEquals(effectiveNumRemoveApps, appsShouldBeRemoved.size()); - assertTrue(Collections.disjoint(completedApps, appsShouldBeRemoved)); } protected void setupDispatcher(RMContext rmContext, Configuration conf) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java index a09aa00671..c0a82fa142 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java @@ -146,17 +146,6 @@ public String getOriginalTrackingUrl() { public int getMaxAppAttempts() { throw new UnsupportedOperationException("Not supported yet."); } - - @Override - public boolean isLogAggregationEnabled() { - throw new UnsupportedOperationException("Not supported yet."); - } - - @Override - public boolean isLogAggregationFinished() { - throw new UnsupportedOperationException("Not supported yet."); - } - @Override public ApplicationReport createAndGetApplicationReport( String clientUserName,boolean allowAccess) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java index 32ece346c2..ad29d274a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java @@ -70,8 +70,6 @@ public class MockRMApp implements RMApp { int maxAppAttempts = 1; List amReqs; private Set applicationTags = null; - private boolean logAggregationEnabled; - private boolean logAggregationFinished; public MockRMApp(int newid, long time, RMAppState newState) { finish = time; @@ -238,24 +236,6 @@ public int getMaxAppAttempts() { return maxAppAttempts; } - @Override - public boolean isLogAggregationEnabled() { - return logAggregationEnabled; - } - - @Override - public boolean isLogAggregationFinished() { - return logAggregationFinished; - } - - public void setLogAggregationEnabled(boolean enabled) { - this.logAggregationEnabled = enabled; - } - - public void setLogAggregationFinished(boolean finished) { - this.logAggregationFinished = finished; - } - public void setNumMaxRetries(int maxAppAttempts) { this.maxAppAttempts = maxAppAttempts; }