YARN-4946. RM should not consider an application as COMPLETED when log aggregation is not in a terminal state (snemeth via rkanter)

This commit is contained in:
Robert Kanter 2018-08-09 14:58:04 -07:00
parent 8244abb7ae
commit b2517dd66b
6 changed files with 295 additions and 74 deletions

View File

@ -86,7 +86,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
private int maxCompletedAppsInMemory;
private int maxCompletedAppsInStateStore;
protected int completedAppsInStateStore = 0;
private LinkedList<ApplicationId> completedApps = new LinkedList<ApplicationId>();
protected LinkedList<ApplicationId> completedApps = new LinkedList<>();
private final RMContext rmContext;
private final ApplicationMasterService masterService;
@ -284,30 +284,71 @@ protected void writeAuditLog(ApplicationId appId) {
* check to see if hit the limit for max # completed apps kept
*/
protected synchronized void checkAppNumCompletedLimit() {
// check apps kept in state store.
while (completedAppsInStateStore > this.maxCompletedAppsInStateStore) {
ApplicationId removeId =
completedApps.get(completedApps.size() - completedAppsInStateStore);
if (completedAppsInStateStore > maxCompletedAppsInStateStore) {
removeCompletedAppsFromStateStore();
}
if (completedApps.size() > maxCompletedAppsInMemory) {
removeCompletedAppsFromMemory();
}
}
private void removeCompletedAppsFromStateStore() {
int numDelete = completedAppsInStateStore - maxCompletedAppsInStateStore;
for (int i = 0; i < numDelete; i++) {
ApplicationId removeId = completedApps.get(i);
RMApp removeApp = rmContext.getRMApps().get(removeId);
boolean deleteApp = shouldDeleteApp(removeApp);
if (deleteApp) {
LOG.info("Max number of completed apps kept in state store met:"
+ " maxCompletedAppsInStateStore = " + maxCompletedAppsInStateStore
+ ", removing app " + removeApp.getApplicationId()
+ " maxCompletedAppsInStateStore = "
+ maxCompletedAppsInStateStore + ", removing app " + removeId
+ " from state store.");
rmContext.getStateStore().removeApplication(removeApp);
completedAppsInStateStore--;
} else {
LOG.info("Max number of completed apps kept in state store met:"
+ " maxCompletedAppsInStateStore = "
+ maxCompletedAppsInStateStore + ", but not removing app "
+ removeId
+ " from state store as log aggregation have not finished yet.");
}
}
}
// check apps kept in memorty.
while (completedApps.size() > this.maxCompletedAppsInMemory) {
ApplicationId removeId = completedApps.remove();
private void removeCompletedAppsFromMemory() {
int numDelete = completedApps.size() - maxCompletedAppsInMemory;
int offset = 0;
for (int i = 0; i < numDelete; i++) {
int deletionIdx = i - offset;
ApplicationId removeId = completedApps.get(deletionIdx);
RMApp removeApp = rmContext.getRMApps().get(removeId);
boolean deleteApp = shouldDeleteApp(removeApp);
if (deleteApp) {
++offset;
LOG.info("Application should be expired, max number of completed apps"
+ " kept in memory met: maxCompletedAppsInMemory = "
+ this.maxCompletedAppsInMemory + ", removing app " + removeId
+ " from memory: ");
completedApps.remove(deletionIdx);
rmContext.getRMApps().remove(removeId);
this.applicationACLsManager.removeApplication(removeId);
} else {
LOG.info("Application should be expired, max number of completed apps"
+ " kept in memory met: maxCompletedAppsInMemory = "
+ this.maxCompletedAppsInMemory + ", but not removing app "
+ removeId
+ " from memory as log aggregation have not finished yet.");
}
}
}
private boolean shouldDeleteApp(RMApp app) {
return !app.isLogAggregationEnabled()
|| app.isLogAggregationFinished();
}
@SuppressWarnings("unchecked")
protected void submitApplication(

View File

@ -243,6 +243,10 @@ ApplicationReport createAndGetApplicationReport(String clientUserName,
*/
int getMaxAppAttempts();
boolean isLogAggregationEnabled();
boolean isLogAggregationFinished();
/**
* Returns the application type
* @return the application type.

View File

@ -1912,7 +1912,13 @@ public LogAggregationStatus getLogAggregationStatusForAppReport() {
}
}
private boolean isLogAggregationFinished() {
@Override
public boolean isLogAggregationEnabled() {
return logAggregationEnabled;
}
@Override
public boolean isLogAggregationFinished() {
return this.logAggregationStatusForAppReport
.equals(LogAggregationStatus.SUCCEEDED)
|| this.logAggregationStatusForAppReport

View File

@ -19,28 +19,9 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.matches;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -88,28 +69,48 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.ManagedParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import static java.util.stream.Collectors.toSet;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.matches;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Testing applications being retired from RM.
@ -131,7 +132,7 @@ public synchronized void setAppEventType(RMAppEventType newType) {
}
public static List<RMApp> newRMApps(int n, long time, RMAppState state) {
private static List<RMApp> newRMApps(int n, long time, RMAppState state) {
List<RMApp> list = Lists.newArrayList();
for (int i = 0; i < n; ++i) {
list.add(new MockRMApp(i, time, state));
@ -139,12 +140,52 @@ public static List<RMApp> newRMApps(int n, long time, RMAppState state) {
return list;
}
private static List<RMApp> newRMAppsMixedLogAggregationStatus(int n,
long time, RMAppState state) {
List<RMApp> list = Lists.newArrayList();
for (int i = 0; i < n; ++i) {
MockRMApp rmApp = new MockRMApp(i, time, state);
rmApp.setLogAggregationEnabled(true);
rmApp.setLogAggregationFinished(i % 2 == 0);
list.add(rmApp);
}
return list;
}
public RMContext mockRMContext(int n, long time) {
final ConcurrentMap<ApplicationId, RMApp> map = createRMAppsMap(n, time);
return createMockRMContextInternal(map);
}
public RMContext mockRMContextWithMixedLogAggregationStatus(int n,
long time) {
final ConcurrentMap<ApplicationId, RMApp> map =
createRMAppsMapMixedLogAggStatus(n, time);
return createMockRMContextInternal(map);
}
private ConcurrentMap<ApplicationId, RMApp> createRMAppsMap(int n,
long time) {
final List<RMApp> apps = newRMApps(n, time, RMAppState.FINISHED);
final ConcurrentMap<ApplicationId, RMApp> map = Maps.newConcurrentMap();
for (RMApp app : apps) {
map.put(app.getApplicationId(), app);
}
return map;
}
private ConcurrentMap<ApplicationId, RMApp> createRMAppsMapMixedLogAggStatus(
int n, long time) {
final List<RMApp> apps =
newRMAppsMixedLogAggregationStatus(n, time, RMAppState.FINISHED);
final ConcurrentMap<ApplicationId, RMApp> map = Maps.newConcurrentMap();
for (RMApp app : apps) {
map.put(app.getApplicationId(), app);
}
return map;
}
private RMContext createMockRMContextInternal(ConcurrentMap<ApplicationId, RMApp> map) {
Dispatcher rmDispatcher = new AsyncDispatcher();
ContainerAllocationExpirer containerAllocationExpirer = new ContainerAllocationExpirer(
rmDispatcher);
@ -198,9 +239,11 @@ public void handle(RMAppEvent event) {
// Extend and make the functions we want to test public
public class TestRMAppManager extends RMAppManager {
private final RMStateStore stateStore;
public TestRMAppManager(RMContext context, Configuration conf) {
super(context, null, null, new ApplicationACLsManager(conf), conf);
this.stateStore = context.getStateStore();
}
public TestRMAppManager(RMContext context,
@ -208,6 +251,7 @@ public TestRMAppManager(RMContext context,
YarnScheduler scheduler, ApplicationMasterService masterService,
ApplicationACLsManager applicationACLsManager, Configuration conf) {
super(context, scheduler, masterService, applicationACLsManager, conf);
this.stateStore = context.getStateStore();
}
public void checkAppNumCompletedLimit() {
@ -222,10 +266,32 @@ public int getCompletedAppsListSize() {
return super.getCompletedAppsListSize();
}
public int getCompletedAppsInStateStore() {
public int getNumberOfCompletedAppsInStateStore() {
return this.completedAppsInStateStore;
}
List<ApplicationId> getCompletedApps() {
return completedApps;
}
Set<ApplicationId> getFirstNCompletedApps(int n) {
return getCompletedApps().stream().limit(n).collect(toSet());
}
Set<ApplicationId> getCompletedAppsWithEvenIdsInRange(int n) {
return getCompletedApps().stream().limit(n)
.filter(app -> app.getId() % 2 == 0).collect(toSet());
}
Set<ApplicationId> getRemovedAppsFromStateStore(int numRemoves) {
ArgumentCaptor<RMApp> argumentCaptor =
ArgumentCaptor.forClass(RMApp.class);
verify(stateStore, times(numRemoves))
.removeApplication(argumentCaptor.capture());
return argumentCaptor.getAllValues().stream().map(RMApp::getApplicationId)
.collect(toSet());
}
public void submitApplication(
ApplicationSubmissionContext submissionContext, String user)
throws YarnException, IOException {
@ -234,8 +300,12 @@ public void submitApplication(
}
}
protected void addToCompletedApps(TestRMAppManager appMonitor, RMContext rmContext) {
for (RMApp app : rmContext.getRMApps().values()) {
private void addToCompletedApps(TestRMAppManager appMonitor,
RMContext rmContext) {
// ensure applications are finished in order by their IDs
List<RMApp> sortedApps = new ArrayList<>(rmContext.getRMApps().values());
sortedApps.sort(Comparator.comparingInt(o -> o.getApplicationId().getId()));
for (RMApp app : sortedApps) {
if (app.getState() == RMAppState.FINISHED
|| app.getState() == RMAppState.KILLED
|| app.getState() == RMAppState.FAILED) {
@ -631,7 +701,8 @@ public void testRMAppRetireZeroSetting() throws Exception {
@Test
public void testStateStoreAppLimitLessThanMemoryAppLimit() {
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(10, now - 20000);
final int allApps = 10;
RMContext rmContext = mockRMContext(allApps, now - 20000);
Configuration conf = new YarnConfiguration();
int maxAppsInMemory = 8;
int maxAppsInStateStore = 4;
@ -641,39 +712,57 @@ public void testStateStoreAppLimitLessThanMemoryAppLimit() {
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
addToCompletedApps(appMonitor, rmContext);
Assert.assertEquals("Number of completed apps incorrect", 10,
Assert.assertEquals("Number of completed apps incorrect", allApps,
appMonitor.getCompletedAppsListSize());
int numRemoveAppsFromStateStore = allApps - maxAppsInStateStore;
Set<ApplicationId> appsShouldBeRemovedFromStateStore = appMonitor
.getFirstNCompletedApps(numRemoveAppsFromStateStore);
appMonitor.checkAppNumCompletedLimit();
Set<ApplicationId> removedAppsFromStateStore = appMonitor
.getRemovedAppsFromStateStore(numRemoveAppsFromStateStore);
Assert.assertEquals("Number of apps incorrect after # completed check",
maxAppsInMemory, rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check",
maxAppsInMemory, appMonitor.getCompletedAppsListSize());
int numRemoveAppsFromStateStore = 10 - maxAppsInStateStore;
verify(rmContext.getStateStore(), times(numRemoveAppsFromStateStore))
.removeApplication(isA(RMApp.class));
Assert.assertEquals(maxAppsInStateStore,
appMonitor.getCompletedAppsInStateStore());
appMonitor.getNumberOfCompletedAppsInStateStore());
List<ApplicationId> completedApps = appMonitor.getCompletedApps();
Assert.assertEquals(maxAppsInMemory, completedApps.size());
Assert.assertEquals(numRemoveAppsFromStateStore,
removedAppsFromStateStore.size());
Assert.assertEquals(numRemoveAppsFromStateStore,
Sets.intersection(appsShouldBeRemovedFromStateStore,
removedAppsFromStateStore).size());
}
@Test
public void testStateStoreAppLimitLargerThanMemoryAppLimit() {
public void testStateStoreAppLimitGreaterThanMemoryAppLimit() {
long now = System.currentTimeMillis();
RMContext rmContext = mockRMContext(10, now - 20000);
final int allApps = 10;
RMContext rmContext = mockRMContext(allApps, now - 20000);
Configuration conf = new YarnConfiguration();
int maxAppsInMemory = 8;
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, maxAppsInMemory);
// larger than maxCompletedAppsInMemory, reset to RM_MAX_COMPLETED_APPLICATIONS.
// greater than maxCompletedAppsInMemory, reset to RM_MAX_COMPLETED_APPLICATIONS.
conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS, 1000);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
addToCompletedApps(appMonitor, rmContext);
Assert.assertEquals("Number of completed apps incorrect", 10,
Assert.assertEquals("Number of completed apps incorrect", allApps,
appMonitor.getCompletedAppsListSize());
int numRemoveApps = allApps - maxAppsInMemory;
Set<ApplicationId> appsShouldBeRemoved = appMonitor
.getFirstNCompletedApps(numRemoveApps);
appMonitor.checkAppNumCompletedLimit();
int numRemoveApps = 10 - maxAppsInMemory;
Assert.assertEquals("Number of apps incorrect after # completed check",
maxAppsInMemory, rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check",
@ -681,7 +770,57 @@ public void testStateStoreAppLimitLargerThanMemoryAppLimit() {
verify(rmContext.getStateStore(), times(numRemoveApps)).removeApplication(
isA(RMApp.class));
Assert.assertEquals(maxAppsInMemory,
appMonitor.getCompletedAppsInStateStore());
appMonitor.getNumberOfCompletedAppsInStateStore());
List<ApplicationId> completedApps = appMonitor.getCompletedApps();
Assert.assertEquals(maxAppsInMemory, completedApps.size());
Assert.assertEquals(numRemoveApps, appsShouldBeRemoved.size());
assertTrue(Collections.disjoint(completedApps, appsShouldBeRemoved));
}
@Test
public void testStateStoreAppLimitSomeAppsHaveNotFinishedLogAggregation() {
long now = System.currentTimeMillis();
final int allApps = 10;
RMContext rmContext =
mockRMContextWithMixedLogAggregationStatus(allApps, now - 20000);
Configuration conf = new YarnConfiguration();
int maxAppsInMemory = 2;
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
maxAppsInMemory);
// greater than maxCompletedAppsInMemory, reset to
// RM_MAX_COMPLETED_APPLICATIONS.
conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
1000);
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
addToCompletedApps(appMonitor, rmContext);
Assert.assertEquals("Number of completed apps incorrect", allApps,
appMonitor.getCompletedAppsListSize());
int numRemoveApps = allApps - maxAppsInMemory;
int effectiveNumRemoveApps = numRemoveApps / 2;
//only apps with even ID would be deleted due to log aggregation status
int expectedNumberOfAppsInMemory = maxAppsInMemory + effectiveNumRemoveApps;
Set<ApplicationId> appsShouldBeRemoved = appMonitor
.getCompletedAppsWithEvenIdsInRange(numRemoveApps);
appMonitor.checkAppNumCompletedLimit();
Assert.assertEquals("Number of apps incorrect after # completed check",
expectedNumberOfAppsInMemory, rmContext.getRMApps().size());
Assert.assertEquals("Number of completed apps incorrect after check",
expectedNumberOfAppsInMemory, appMonitor.getCompletedAppsListSize());
verify(rmContext.getStateStore(), times(effectiveNumRemoveApps))
.removeApplication(isA(RMApp.class));
Assert.assertEquals(expectedNumberOfAppsInMemory,
appMonitor.getNumberOfCompletedAppsInStateStore());
List<ApplicationId> completedApps = appMonitor.getCompletedApps();
Assert.assertEquals(expectedNumberOfAppsInMemory, completedApps.size());
Assert.assertEquals(effectiveNumRemoveApps, appsShouldBeRemoved.size());
assertTrue(Collections.disjoint(completedApps, appsShouldBeRemoved));
}
protected void setupDispatcher(RMContext rmContext, Configuration conf) {

View File

@ -146,6 +146,17 @@ public String getOriginalTrackingUrl() {
public int getMaxAppAttempts() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public boolean isLogAggregationEnabled() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public boolean isLogAggregationFinished() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public ApplicationReport createAndGetApplicationReport(
String clientUserName,boolean allowAccess) {

View File

@ -70,6 +70,8 @@ public class MockRMApp implements RMApp {
int maxAppAttempts = 1;
List<ResourceRequest> amReqs;
private Set<String> applicationTags = null;
private boolean logAggregationEnabled;
private boolean logAggregationFinished;
public MockRMApp(int newid, long time, RMAppState newState) {
finish = time;
@ -236,6 +238,24 @@ public int getMaxAppAttempts() {
return maxAppAttempts;
}
@Override
public boolean isLogAggregationEnabled() {
return logAggregationEnabled;
}
@Override
public boolean isLogAggregationFinished() {
return logAggregationFinished;
}
public void setLogAggregationEnabled(boolean enabled) {
this.logAggregationEnabled = enabled;
}
public void setLogAggregationFinished(boolean finished) {
this.logAggregationFinished = finished;
}
public void setNumMaxRetries(int maxAppAttempts) {
this.maxAppAttempts = maxAppAttempts;
}