YARN-9848. Revert YARN-4946. Contributed by Steven Rand.
This commit is contained in:
parent
e2322e1117
commit
6ef01646ba
@ -93,7 +93,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
|||||||
private int maxCompletedAppsInMemory;
|
private int maxCompletedAppsInMemory;
|
||||||
private int maxCompletedAppsInStateStore;
|
private int maxCompletedAppsInStateStore;
|
||||||
protected int completedAppsInStateStore = 0;
|
protected int completedAppsInStateStore = 0;
|
||||||
protected LinkedList<ApplicationId> completedApps = new LinkedList<>();
|
private LinkedList<ApplicationId> completedApps = new LinkedList<>();
|
||||||
|
|
||||||
private final RMContext rmContext;
|
private final RMContext rmContext;
|
||||||
private final ApplicationMasterService masterService;
|
private final ApplicationMasterService masterService;
|
||||||
@ -316,72 +316,31 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
|||||||
* check to see if hit the limit for max # completed apps kept
|
* check to see if hit the limit for max # completed apps kept
|
||||||
*/
|
*/
|
||||||
protected synchronized void checkAppNumCompletedLimit() {
|
protected synchronized void checkAppNumCompletedLimit() {
|
||||||
if (completedAppsInStateStore > maxCompletedAppsInStateStore) {
|
// check apps kept in state store.
|
||||||
removeCompletedAppsFromStateStore();
|
while (completedAppsInStateStore > this.maxCompletedAppsInStateStore) {
|
||||||
}
|
ApplicationId removeId =
|
||||||
|
completedApps.get(completedApps.size() - completedAppsInStateStore);
|
||||||
if (completedApps.size() > maxCompletedAppsInMemory) {
|
|
||||||
removeCompletedAppsFromMemory();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void removeCompletedAppsFromStateStore() {
|
|
||||||
int numDelete = completedAppsInStateStore - maxCompletedAppsInStateStore;
|
|
||||||
for (int i = 0; i < numDelete; i++) {
|
|
||||||
ApplicationId removeId = completedApps.get(i);
|
|
||||||
RMApp removeApp = rmContext.getRMApps().get(removeId);
|
RMApp removeApp = rmContext.getRMApps().get(removeId);
|
||||||
boolean deleteApp = shouldDeleteApp(removeApp);
|
LOG.info("Max number of completed apps kept in state store met:"
|
||||||
|
+ " maxCompletedAppsInStateStore = " + maxCompletedAppsInStateStore
|
||||||
if (deleteApp) {
|
+ ", removing app " + removeApp.getApplicationId()
|
||||||
LOG.info("Max number of completed apps kept in state store met:"
|
+ " from state store.");
|
||||||
+ " maxCompletedAppsInStateStore = "
|
rmContext.getStateStore().removeApplication(removeApp);
|
||||||
+ maxCompletedAppsInStateStore + ", removing app " + removeId
|
completedAppsInStateStore--;
|
||||||
+ " from state store.");
|
|
||||||
rmContext.getStateStore().removeApplication(removeApp);
|
|
||||||
completedAppsInStateStore--;
|
|
||||||
} else {
|
|
||||||
LOG.info("Max number of completed apps kept in state store met:"
|
|
||||||
+ " maxCompletedAppsInStateStore = "
|
|
||||||
+ maxCompletedAppsInStateStore + ", but not removing app "
|
|
||||||
+ removeId
|
|
||||||
+ " from state store as log aggregation have not finished yet.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
private void removeCompletedAppsFromMemory() {
|
// check apps kept in memory.
|
||||||
int numDelete = completedApps.size() - maxCompletedAppsInMemory;
|
while (completedApps.size() > this.maxCompletedAppsInMemory) {
|
||||||
int offset = 0;
|
ApplicationId removeId = completedApps.remove();
|
||||||
for (int i = 0; i < numDelete; i++) {
|
LOG.info("Application should be expired, max number of completed apps"
|
||||||
int deletionIdx = i - offset;
|
+ " kept in memory met: maxCompletedAppsInMemory = "
|
||||||
ApplicationId removeId = completedApps.get(deletionIdx);
|
+ this.maxCompletedAppsInMemory + ", removing app " + removeId
|
||||||
RMApp removeApp = rmContext.getRMApps().get(removeId);
|
+ " from memory: ");
|
||||||
boolean deleteApp = shouldDeleteApp(removeApp);
|
rmContext.getRMApps().remove(removeId);
|
||||||
|
this.applicationACLsManager.removeApplication(removeId);
|
||||||
if (deleteApp) {
|
|
||||||
++offset;
|
|
||||||
LOG.info("Application should be expired, max number of completed apps"
|
|
||||||
+ " kept in memory met: maxCompletedAppsInMemory = "
|
|
||||||
+ this.maxCompletedAppsInMemory + ", removing app " + removeId
|
|
||||||
+ " from memory: ");
|
|
||||||
completedApps.remove(deletionIdx);
|
|
||||||
rmContext.getRMApps().remove(removeId);
|
|
||||||
this.applicationACLsManager.removeApplication(removeId);
|
|
||||||
} else {
|
|
||||||
LOG.info("Application should be expired, max number of completed apps"
|
|
||||||
+ " kept in memory met: maxCompletedAppsInMemory = "
|
|
||||||
+ this.maxCompletedAppsInMemory + ", but not removing app "
|
|
||||||
+ removeId
|
|
||||||
+ " from memory as log aggregation have not finished yet.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean shouldDeleteApp(RMApp app) {
|
|
||||||
return !app.isLogAggregationEnabled()
|
|
||||||
|| app.isLogAggregationFinished();
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
protected void submitApplication(
|
protected void submitApplication(
|
||||||
ApplicationSubmissionContext submissionContext, long submitTime,
|
ApplicationSubmissionContext submissionContext, long submitTime,
|
||||||
|
@ -243,10 +243,6 @@ public interface RMApp extends EventHandler<RMAppEvent> {
|
|||||||
*/
|
*/
|
||||||
int getMaxAppAttempts();
|
int getMaxAppAttempts();
|
||||||
|
|
||||||
boolean isLogAggregationEnabled();
|
|
||||||
|
|
||||||
boolean isLogAggregationFinished();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the application type
|
* Returns the application type
|
||||||
* @return the application type.
|
* @return the application type.
|
||||||
|
@ -1765,16 +1765,6 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|||||||
logAggregation.aggregateLogReport(nodeId, report, this);
|
logAggregation.aggregateLogReport(nodeId, report, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isLogAggregationFinished() {
|
|
||||||
return logAggregation.isFinished();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isLogAggregationEnabled() {
|
|
||||||
return logAggregation.isEnabled();
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
|
public String getLogAggregationFailureMessagesForNM(NodeId nodeId) {
|
||||||
return logAggregation.getLogAggregationFailureMessagesForNM(nodeId);
|
return logAggregation.getLogAggregationFailureMessagesForNM(nodeId);
|
||||||
}
|
}
|
||||||
|
@ -18,24 +18,15 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
import static java.util.stream.Collectors.toSet;
|
|
||||||
import static org.mockito.Mockito.times;
|
|
||||||
import static org.mockito.Mockito.verify;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.mockito.ArgumentCaptor;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base class for AppManager related test.
|
* Base class for AppManager related test.
|
||||||
@ -76,28 +67,6 @@ public class AppManagerTestBase {
|
|||||||
return this.completedAppsInStateStore;
|
return this.completedAppsInStateStore;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<ApplicationId> getCompletedApps() {
|
|
||||||
return completedApps;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Set<ApplicationId> getFirstNCompletedApps(int n) {
|
|
||||||
return getCompletedApps().stream().limit(n).collect(toSet());
|
|
||||||
}
|
|
||||||
|
|
||||||
public Set<ApplicationId> getCompletedAppsWithEvenIdsInRange(int n) {
|
|
||||||
return getCompletedApps().stream().limit(n)
|
|
||||||
.filter(app -> app.getId() % 2 == 0).collect(toSet());
|
|
||||||
}
|
|
||||||
|
|
||||||
public Set<ApplicationId> getRemovedAppsFromStateStore(int numRemoves) {
|
|
||||||
ArgumentCaptor<RMApp> argumentCaptor =
|
|
||||||
ArgumentCaptor.forClass(RMApp.class);
|
|
||||||
verify(stateStore, times(numRemoves))
|
|
||||||
.removeApplication(argumentCaptor.capture());
|
|
||||||
return argumentCaptor.getAllValues().stream().map(RMApp::getApplicationId)
|
|
||||||
.collect(toSet());
|
|
||||||
}
|
|
||||||
|
|
||||||
public void submitApplication(
|
public void submitApplication(
|
||||||
ApplicationSubmissionContext submissionContext, String user)
|
ApplicationSubmissionContext submissionContext, String user)
|
||||||
throws YarnException {
|
throws YarnException {
|
||||||
|
@ -89,7 +89,6 @@ import java.io.IOException;
|
|||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -143,52 +142,12 @@ public class TestAppManager extends AppManagerTestBase{
|
|||||||
return list;
|
return list;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static List<RMApp> newRMAppsMixedLogAggregationStatus(int n,
|
|
||||||
long time, RMAppState state) {
|
|
||||||
List<RMApp> list = Lists.newArrayList();
|
|
||||||
for (int i = 0; i < n; ++i) {
|
|
||||||
MockRMApp rmApp = new MockRMApp(i, time, state);
|
|
||||||
rmApp.setLogAggregationEnabled(true);
|
|
||||||
rmApp.setLogAggregationFinished(i % 2 == 0);
|
|
||||||
list.add(rmApp);
|
|
||||||
}
|
|
||||||
return list;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RMContext mockRMContext(int n, long time) {
|
public RMContext mockRMContext(int n, long time) {
|
||||||
final ConcurrentMap<ApplicationId, RMApp> map = createRMAppsMap(n, time);
|
|
||||||
return createMockRMContextInternal(map);
|
|
||||||
}
|
|
||||||
|
|
||||||
public RMContext mockRMContextWithMixedLogAggregationStatus(int n,
|
|
||||||
long time) {
|
|
||||||
final ConcurrentMap<ApplicationId, RMApp> map =
|
|
||||||
createRMAppsMapMixedLogAggStatus(n, time);
|
|
||||||
return createMockRMContextInternal(map);
|
|
||||||
}
|
|
||||||
|
|
||||||
private ConcurrentMap<ApplicationId, RMApp> createRMAppsMap(int n,
|
|
||||||
long time) {
|
|
||||||
final List<RMApp> apps = newRMApps(n, time, RMAppState.FINISHED);
|
final List<RMApp> apps = newRMApps(n, time, RMAppState.FINISHED);
|
||||||
final ConcurrentMap<ApplicationId, RMApp> map = Maps.newConcurrentMap();
|
final ConcurrentMap<ApplicationId, RMApp> map = Maps.newConcurrentMap();
|
||||||
for (RMApp app : apps) {
|
for (RMApp app : apps) {
|
||||||
map.put(app.getApplicationId(), app);
|
map.put(app.getApplicationId(), app);
|
||||||
}
|
}
|
||||||
return map;
|
|
||||||
}
|
|
||||||
|
|
||||||
private ConcurrentMap<ApplicationId, RMApp> createRMAppsMapMixedLogAggStatus(
|
|
||||||
int n, long time) {
|
|
||||||
final List<RMApp> apps =
|
|
||||||
newRMAppsMixedLogAggregationStatus(n, time, RMAppState.FINISHED);
|
|
||||||
final ConcurrentMap<ApplicationId, RMApp> map = Maps.newConcurrentMap();
|
|
||||||
for (RMApp app : apps) {
|
|
||||||
map.put(app.getApplicationId(), app);
|
|
||||||
}
|
|
||||||
return map;
|
|
||||||
}
|
|
||||||
|
|
||||||
private RMContext createMockRMContextInternal(ConcurrentMap<ApplicationId, RMApp> map) {
|
|
||||||
Dispatcher rmDispatcher = new AsyncDispatcher();
|
Dispatcher rmDispatcher = new AsyncDispatcher();
|
||||||
ContainerAllocationExpirer containerAllocationExpirer = new ContainerAllocationExpirer(
|
ContainerAllocationExpirer containerAllocationExpirer = new ContainerAllocationExpirer(
|
||||||
rmDispatcher);
|
rmDispatcher);
|
||||||
@ -240,12 +199,8 @@ public class TestAppManager extends AppManagerTestBase{
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addToCompletedApps(TestRMAppManager appMonitor,
|
protected void addToCompletedApps(TestRMAppManager appMonitor, RMContext rmContext) {
|
||||||
RMContext rmContext) {
|
for (RMApp app : rmContext.getRMApps().values()) {
|
||||||
// ensure applications are finished in order by their IDs
|
|
||||||
List<RMApp> sortedApps = new ArrayList<>(rmContext.getRMApps().values());
|
|
||||||
sortedApps.sort(Comparator.comparingInt(o -> o.getApplicationId().getId()));
|
|
||||||
for (RMApp app : sortedApps) {
|
|
||||||
if (app.getState() == RMAppState.FINISHED
|
if (app.getState() == RMAppState.FINISHED
|
||||||
|| app.getState() == RMAppState.KILLED
|
|| app.getState() == RMAppState.KILLED
|
||||||
|| app.getState() == RMAppState.FAILED) {
|
|| app.getState() == RMAppState.FAILED) {
|
||||||
@ -654,32 +609,18 @@ public class TestAppManager extends AppManagerTestBase{
|
|||||||
addToCompletedApps(appMonitor, rmContext);
|
addToCompletedApps(appMonitor, rmContext);
|
||||||
Assert.assertEquals("Number of completed apps incorrect", allApps,
|
Assert.assertEquals("Number of completed apps incorrect", allApps,
|
||||||
appMonitor.getCompletedAppsListSize());
|
appMonitor.getCompletedAppsListSize());
|
||||||
|
|
||||||
int numRemoveAppsFromStateStore = allApps - maxAppsInStateStore;
|
|
||||||
Set<ApplicationId> appsShouldBeRemovedFromStateStore = appMonitor
|
|
||||||
.getFirstNCompletedApps(numRemoveAppsFromStateStore);
|
|
||||||
appMonitor.checkAppNumCompletedLimit();
|
appMonitor.checkAppNumCompletedLimit();
|
||||||
|
|
||||||
Set<ApplicationId> removedAppsFromStateStore = appMonitor
|
|
||||||
.getRemovedAppsFromStateStore(numRemoveAppsFromStateStore);
|
|
||||||
|
|
||||||
Assert.assertEquals("Number of apps incorrect after # completed check",
|
Assert.assertEquals("Number of apps incorrect after # completed check",
|
||||||
maxAppsInMemory, rmContext.getRMApps().size());
|
maxAppsInMemory, rmContext.getRMApps().size());
|
||||||
Assert.assertEquals("Number of completed apps incorrect after check",
|
Assert.assertEquals("Number of completed apps incorrect after check",
|
||||||
maxAppsInMemory, appMonitor.getCompletedAppsListSize());
|
maxAppsInMemory, appMonitor.getCompletedAppsListSize());
|
||||||
|
|
||||||
|
int numRemoveAppsFromStateStore = 10 - maxAppsInStateStore;
|
||||||
verify(rmContext.getStateStore(), times(numRemoveAppsFromStateStore))
|
verify(rmContext.getStateStore(), times(numRemoveAppsFromStateStore))
|
||||||
.removeApplication(isA(RMApp.class));
|
.removeApplication(isA(RMApp.class));
|
||||||
Assert.assertEquals(maxAppsInStateStore,
|
Assert.assertEquals(maxAppsInStateStore,
|
||||||
appMonitor.getNumberOfCompletedAppsInStateStore());
|
appMonitor.getNumberOfCompletedAppsInStateStore());
|
||||||
|
|
||||||
List<ApplicationId> completedApps = appMonitor.getCompletedApps();
|
|
||||||
Assert.assertEquals(maxAppsInMemory, completedApps.size());
|
|
||||||
Assert.assertEquals(numRemoveAppsFromStateStore,
|
|
||||||
removedAppsFromStateStore.size());
|
|
||||||
Assert.assertEquals(numRemoveAppsFromStateStore,
|
|
||||||
Sets.intersection(appsShouldBeRemovedFromStateStore,
|
|
||||||
removedAppsFromStateStore).size());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -697,12 +638,9 @@ public class TestAppManager extends AppManagerTestBase{
|
|||||||
addToCompletedApps(appMonitor, rmContext);
|
addToCompletedApps(appMonitor, rmContext);
|
||||||
Assert.assertEquals("Number of completed apps incorrect", allApps,
|
Assert.assertEquals("Number of completed apps incorrect", allApps,
|
||||||
appMonitor.getCompletedAppsListSize());
|
appMonitor.getCompletedAppsListSize());
|
||||||
|
|
||||||
int numRemoveApps = allApps - maxAppsInMemory;
|
|
||||||
Set<ApplicationId> appsShouldBeRemoved = appMonitor
|
|
||||||
.getFirstNCompletedApps(numRemoveApps);
|
|
||||||
appMonitor.checkAppNumCompletedLimit();
|
appMonitor.checkAppNumCompletedLimit();
|
||||||
|
|
||||||
|
int numRemoveApps = allApps - maxAppsInMemory;
|
||||||
Assert.assertEquals("Number of apps incorrect after # completed check",
|
Assert.assertEquals("Number of apps incorrect after # completed check",
|
||||||
maxAppsInMemory, rmContext.getRMApps().size());
|
maxAppsInMemory, rmContext.getRMApps().size());
|
||||||
Assert.assertEquals("Number of completed apps incorrect after check",
|
Assert.assertEquals("Number of completed apps incorrect after check",
|
||||||
@ -711,56 +649,6 @@ public class TestAppManager extends AppManagerTestBase{
|
|||||||
isA(RMApp.class));
|
isA(RMApp.class));
|
||||||
Assert.assertEquals(maxAppsInMemory,
|
Assert.assertEquals(maxAppsInMemory,
|
||||||
appMonitor.getNumberOfCompletedAppsInStateStore());
|
appMonitor.getNumberOfCompletedAppsInStateStore());
|
||||||
|
|
||||||
List<ApplicationId> completedApps = appMonitor.getCompletedApps();
|
|
||||||
Assert.assertEquals(maxAppsInMemory, completedApps.size());
|
|
||||||
Assert.assertEquals(numRemoveApps, appsShouldBeRemoved.size());
|
|
||||||
assertTrue(Collections.disjoint(completedApps, appsShouldBeRemoved));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testStateStoreAppLimitSomeAppsHaveNotFinishedLogAggregation() {
|
|
||||||
long now = System.currentTimeMillis();
|
|
||||||
final int allApps = 10;
|
|
||||||
RMContext rmContext =
|
|
||||||
mockRMContextWithMixedLogAggregationStatus(allApps, now - 20000);
|
|
||||||
Configuration conf = new YarnConfiguration();
|
|
||||||
int maxAppsInMemory = 2;
|
|
||||||
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
|
|
||||||
maxAppsInMemory);
|
|
||||||
// greater than maxCompletedAppsInMemory, reset to
|
|
||||||
// RM_MAX_COMPLETED_APPLICATIONS.
|
|
||||||
conf.setInt(YarnConfiguration.RM_STATE_STORE_MAX_COMPLETED_APPLICATIONS,
|
|
||||||
1000);
|
|
||||||
TestRMAppManager appMonitor = new TestRMAppManager(rmContext, conf);
|
|
||||||
|
|
||||||
addToCompletedApps(appMonitor, rmContext);
|
|
||||||
Assert.assertEquals("Number of completed apps incorrect", allApps,
|
|
||||||
appMonitor.getCompletedAppsListSize());
|
|
||||||
|
|
||||||
int numRemoveApps = allApps - maxAppsInMemory;
|
|
||||||
int effectiveNumRemoveApps = numRemoveApps / 2;
|
|
||||||
//only apps with even ID would be deleted due to log aggregation status
|
|
||||||
int expectedNumberOfAppsInMemory = maxAppsInMemory + effectiveNumRemoveApps;
|
|
||||||
|
|
||||||
Set<ApplicationId> appsShouldBeRemoved = appMonitor
|
|
||||||
.getCompletedAppsWithEvenIdsInRange(numRemoveApps);
|
|
||||||
appMonitor.checkAppNumCompletedLimit();
|
|
||||||
|
|
||||||
Assert.assertEquals("Number of apps incorrect after # completed check",
|
|
||||||
expectedNumberOfAppsInMemory, rmContext.getRMApps().size());
|
|
||||||
Assert.assertEquals("Number of completed apps incorrect after check",
|
|
||||||
expectedNumberOfAppsInMemory, appMonitor.getCompletedAppsListSize());
|
|
||||||
verify(rmContext.getStateStore(), times(effectiveNumRemoveApps))
|
|
||||||
.removeApplication(isA(RMApp.class));
|
|
||||||
Assert.assertEquals(expectedNumberOfAppsInMemory,
|
|
||||||
appMonitor.getNumberOfCompletedAppsInStateStore());
|
|
||||||
|
|
||||||
List<ApplicationId> completedApps = appMonitor.getCompletedApps();
|
|
||||||
|
|
||||||
Assert.assertEquals(expectedNumberOfAppsInMemory, completedApps.size());
|
|
||||||
Assert.assertEquals(effectiveNumRemoveApps, appsShouldBeRemoved.size());
|
|
||||||
assertTrue(Collections.disjoint(completedApps, appsShouldBeRemoved));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void setupDispatcher(RMContext rmContext, Configuration conf) {
|
protected void setupDispatcher(RMContext rmContext, Configuration conf) {
|
||||||
|
@ -146,17 +146,6 @@ public abstract class MockAsm extends MockApps {
|
|||||||
public int getMaxAppAttempts() {
|
public int getMaxAppAttempts() {
|
||||||
throw new UnsupportedOperationException("Not supported yet.");
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isLogAggregationEnabled() {
|
|
||||||
throw new UnsupportedOperationException("Not supported yet.");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isLogAggregationFinished() {
|
|
||||||
throw new UnsupportedOperationException("Not supported yet.");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ApplicationReport createAndGetApplicationReport(
|
public ApplicationReport createAndGetApplicationReport(
|
||||||
String clientUserName,boolean allowAccess) {
|
String clientUserName,boolean allowAccess) {
|
||||||
|
@ -70,8 +70,6 @@ public class MockRMApp implements RMApp {
|
|||||||
int maxAppAttempts = 1;
|
int maxAppAttempts = 1;
|
||||||
List<ResourceRequest> amReqs;
|
List<ResourceRequest> amReqs;
|
||||||
private Set<String> applicationTags = null;
|
private Set<String> applicationTags = null;
|
||||||
private boolean logAggregationEnabled;
|
|
||||||
private boolean logAggregationFinished;
|
|
||||||
|
|
||||||
public MockRMApp(int newid, long time, RMAppState newState) {
|
public MockRMApp(int newid, long time, RMAppState newState) {
|
||||||
finish = time;
|
finish = time;
|
||||||
@ -238,24 +236,6 @@ public class MockRMApp implements RMApp {
|
|||||||
return maxAppAttempts;
|
return maxAppAttempts;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isLogAggregationEnabled() {
|
|
||||||
return logAggregationEnabled;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isLogAggregationFinished() {
|
|
||||||
return logAggregationFinished;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setLogAggregationEnabled(boolean enabled) {
|
|
||||||
this.logAggregationEnabled = enabled;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setLogAggregationFinished(boolean finished) {
|
|
||||||
this.logAggregationFinished = finished;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setNumMaxRetries(int maxAppAttempts) {
|
public void setNumMaxRetries(int maxAppAttempts) {
|
||||||
this.maxAppAttempts = maxAppAttempts;
|
this.maxAppAttempts = maxAppAttempts;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user