YARN-11531. [Federation] Code cleanup for NodeManager#amrmproxy. (#5841)

This commit is contained in:
slfan1989 2023-07-20 02:50:38 +08:00 committed by GitHub
parent b6b259066f
commit 84dd624768
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 670 additions and 820 deletions

View File

@ -28,7 +28,7 @@ import java.util.Collection;
public abstract class FederationMethodWrapper {
/**
* List of parameters: static and dynamic values, matchings types.
* List of parameters: static and dynamic values, matching types.
*/
private Object[] params;

View File

@ -67,7 +67,7 @@ public interface AMRMProxyApplicationContext {
* Gets the NMContext object.
* @return the NMContext.
*/
Context getNMCotext();
Context getNMContext();
/**
* Gets the credentials of this application.

View File

@ -144,7 +144,7 @@ public class AMRMProxyApplicationContextImpl implements
}
@Override
public Context getNMCotext() {
public Context getNMContext() {
return nmContext;
}

View File

@ -741,7 +741,7 @@ public class AMRMProxyService extends CompositeService implements
YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
List<String> interceptorClassNames = new ArrayList<String>();
List<String> interceptorClassNames = new ArrayList<>();
Collection<String> tempList =
StringUtils.getStringCollection(configuredInterceptorClassNames);
for (String item : tempList) {

View File

@ -72,8 +72,7 @@ public class AMRMProxyTokenSecretManager extends
private NMStateStoreService nmStateStore;
private final Set<ApplicationAttemptId> appAttemptSet =
new HashSet<ApplicationAttemptId>();
private final Set<ApplicationAttemptId> appAttemptSet = new HashSet<>();
/**
* Create an {@link AMRMProxyTokenSecretManager}.
@ -226,8 +225,7 @@ public class AMRMProxyTokenSecretManager extends
.getMasterKey().getKeyId());
byte[] password = this.createPassword(identifier);
appAttemptSet.add(appAttemptId);
return new Token<AMRMTokenIdentifier>(identifier.getBytes(),
password, identifier.getKind(), new Text());
return new Token<>(identifier.getBytes(), password, identifier.getKind(), new Text());
} finally {
this.writeLock.unlock();
}

View File

@ -159,9 +159,9 @@ public abstract class AbstractRequestInterceptor implements
* @return the NMSS instance
*/
public NMStateStoreService getNMStateStore() {
if (this.appContext == null || this.appContext.getNMCotext() == null) {
if (this.appContext == null || this.appContext.getNMContext() == null) {
return null;
}
return this.appContext.getNMCotext().getNMStateStore();
return this.appContext.getNMContext().getNMStateStore();
}
}

View File

@ -95,25 +95,17 @@ public final class DefaultRequestInterceptor extends
private ApplicationMasterProtocol createRMClient(
AMRMProxyApplicationContext appContext, final Configuration conf)
throws IOException, InterruptedException {
if (appContext.getNMCotext().isDistributedSchedulingEnabled()) {
return user.doAs(
new PrivilegedExceptionAction<DistributedSchedulingAMProtocol>() {
@Override
public DistributedSchedulingAMProtocol run() throws Exception {
setAMRMTokenService(conf);
return ServerRMProxy.createRMProxy(conf,
DistributedSchedulingAMProtocol.class);
}
});
if (appContext.getNMContext().isDistributedSchedulingEnabled()) {
return user.doAs((PrivilegedExceptionAction<DistributedSchedulingAMProtocol>) () -> {
setAMRMTokenService(conf);
return ServerRMProxy.createRMProxy(conf, DistributedSchedulingAMProtocol.class);
});
} else {
return user.doAs(
new PrivilegedExceptionAction<ApplicationMasterProtocol>() {
@Override
public ApplicationMasterProtocol run() throws Exception {
setAMRMTokenService(conf);
return ClientRMProxy.createRMProxy(conf,
ApplicationMasterProtocol.class);
}
(PrivilegedExceptionAction<ApplicationMasterProtocol>) () -> {
setAMRMTokenService(conf);
return ClientRMProxy.createRMProxy(conf,
ApplicationMasterProtocol.class);
});
}
}
@ -144,7 +136,7 @@ public final class DefaultRequestInterceptor extends
registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws YarnException,
IOException {
if (getApplicationContext().getNMCotext()
if (getApplicationContext().getNMContext()
.isDistributedSchedulingEnabled()) {
LOG.info("Forwarding registerApplicationMasterForDistributedScheduling" +
"request to the real YARN RM");
@ -161,7 +153,7 @@ public final class DefaultRequestInterceptor extends
throws YarnException, IOException {
LOG.debug("Forwarding allocateForDistributedScheduling request" +
"to the real YARN RM");
if (getApplicationContext().getNMCotext()
if (getApplicationContext().getNMContext()
.isDistributedSchedulingEnabled()) {
DistributedSchedulingAllocateResponse allocateResponse =
((DistributedSchedulingAMProtocol)rmClient)
@ -197,7 +189,7 @@ public final class DefaultRequestInterceptor extends
@VisibleForTesting
public void setRMClient(final ApplicationMasterProtocol rmClient) {
if (rmClient instanceof DistributedSchedulingAMProtocol) {
this.rmClient = (DistributedSchedulingAMProtocol)rmClient;
this.rmClient = rmClient;
} else {
this.rmClient = new DistributedSchedulingAMProtocol() {
@Override
@ -254,7 +246,7 @@ public final class DefaultRequestInterceptor extends
String defaultAddr, int defaultPort) {
if (HAUtil.isHAEnabled(conf)) {
// Build a list of service addresses to form the service name
ArrayList<String> services = new ArrayList<String>();
ArrayList<String> services = new ArrayList<>();
YarnConfiguration yarnConf = new YarnConfiguration(conf);
for (String rmId : HAUtil.getRMHAIds(conf)) {
// Set RM_ID to get the corresponding RM_ADDRESS

View File

@ -29,7 +29,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
@ -124,7 +123,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
/**
* When AMRMProxy HA is enabled, secondary AMRMTokens will be stored in Yarn
* Registry. Otherwise if NM recovery is enabled, the UAM token are stored in
* Registry. Otherwise, if NM recovery is enabled, the UAM token are stored in
* local NMSS instead under this directory name.
*/
public static final String NMSS_SECONDARY_SC_PREFIX =
@ -150,7 +149,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
private AMRMClientRelayer homeRMRelayer;
private SubClusterId homeSubClusterId;
private AMHeartbeatRequestHandler homeHeartbeartHandler;
private AMHeartbeatRequestHandler homeHeartbeatHandler;
/**
* UAM pool for secondary sub-clusters (ones other than home sub-cluster),
@ -162,20 +161,20 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
* first time. AM heart beats to them are also handled asynchronously for
* performance reasons.
*/
private UnmanagedAMPoolManager uamPool;
private final UnmanagedAMPoolManager uamPool;
/**
* The rmProxy relayers for secondary sub-clusters that keep track of all
* pending requests.
*/
private Map<String, AMRMClientRelayer> secondaryRelayers;
private final Map<String, AMRMClientRelayer> secondaryRelayers;
/**
* Stores the AllocateResponses that are received asynchronously from all the
* sub-cluster resource managers, including home RM, but not merged and
* returned back to AM yet.
*/
private Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
private final Map<SubClusterId, List<AllocateResponse>> asyncResponseSink;
/**
* Remembers the last allocate response from all known sub-clusters. This is
@ -183,15 +182,15 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
* cluster-wide info (e.g. AvailableResource, NumClusterNodes) in the allocate
* response back to AM.
*/
private Map<SubClusterId, AllocateResponse> lastSCResponse;
private final Map<SubClusterId, AllocateResponse> lastSCResponse;
/**
* The async UAM registration result that is not consumed yet.
*/
private Map<SubClusterId, RegisterApplicationMasterResponse> uamRegistrations;
private final Map<SubClusterId, RegisterApplicationMasterResponse> uamRegistrations;
// For unit test synchronization
private Map<SubClusterId, Future<?>> uamRegisterFutures;
private final Map<SubClusterId, Future<?>> uamRegisterFutures;
/** Thread pool used for asynchronous operations. */
private ExecutorService threadpool;
@ -216,7 +215,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
* the container, so that we know which sub-cluster to forward later requests
* about existing containers to.
*/
private Map<ContainerId, SubClusterId> containerIdToSubClusterIdMap;
private final Map<ContainerId, SubClusterId> containerIdToSubClusterIdMap;
/**
* The original registration request that was sent by the AM. This instance is
@ -259,7 +258,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
private boolean waitUamRegisterDone;
private MonotonicClock clock = new MonotonicClock();
private final MonotonicClock clock = new MonotonicClock();
/**
* Creates an instance of the FederationInterceptor class.
@ -324,11 +323,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
ApplicationMasterProtocol.class, appOwner), appId,
this.homeSubClusterId.toString());
this.homeHeartbeartHandler =
createHomeHeartbeartHandler(conf, appId, this.homeRMRelayer);
this.homeHeartbeartHandler.setUGI(appOwner);
this.homeHeartbeartHandler.setDaemon(true);
this.homeHeartbeartHandler.start();
this.homeHeartbeatHandler =
createHomeHeartbeatHandler(conf, appId, this.homeRMRelayer);
this.homeHeartbeatHandler.setUGI(appOwner);
this.homeHeartbeatHandler.setDaemon(true);
this.homeHeartbeatHandler.start();
// set lastResponseId to -1 before application master registers
this.lastAllocateResponse =
@ -852,7 +851,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.homeRMRelayer.finishApplicationMaster(request);
// Stop the home heartbeat thread
this.homeHeartbeartHandler.shutdown();
this.homeHeartbeatHandler.shutdown();
if (failedToUnRegister) {
homeResponse.setIsUnregistered(false);
@ -868,9 +867,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
private boolean checkRequestFinalApplicationStatusSuccess(
FinishApplicationMasterRequest request) {
if (request != null && request.getFinalApplicationStatus() != null) {
if (request.getFinalApplicationStatus().equals(FinalApplicationStatus.SUCCEEDED)) {
return true;
}
return request.getFinalApplicationStatus().equals(FinalApplicationStatus.SUCCEEDED);
}
return false;
}
@ -907,7 +904,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
// Stop the home heartbeat thread
this.homeHeartbeartHandler.shutdown();
this.homeHeartbeatHandler.shutdown();
this.homeRMRelayer.shutdown();
// Shutdown needs to clean up app
@ -946,12 +943,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
@VisibleForTesting
protected AMHeartbeatRequestHandler getHomeHeartbeartHandler() {
return this.homeHeartbeartHandler;
protected AMHeartbeatRequestHandler getHomeHeartbeatHandler() {
return this.homeHeartbeatHandler;
}
/**
* Create the UAM pool manager for secondary sub-clsuters. For unit test to
* Create the UAM pool manager for secondary sub-clusters. For unit test to
* override.
*
* @param threadPool the thread pool to use
@ -964,7 +961,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
@VisibleForTesting
protected AMHeartbeatRequestHandler createHomeHeartbeartHandler(
protected AMHeartbeatRequestHandler createHomeHeartbeatHandler(
Configuration conf, ApplicationId appId,
AMRMClientRelayer rmProxyRelayer) {
return new AMHeartbeatRequestHandler(conf, appId, rmProxyRelayer);
@ -1052,49 +1049,41 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
final Token<AMRMTokenIdentifier> amrmToken = entry.getValue();
completionService
.submit(new Callable<RegisterApplicationMasterResponse>() {
@Override
public RegisterApplicationMasterResponse call() throws Exception {
RegisterApplicationMasterResponse response = null;
try {
// Create a config loaded with federation on and subclusterId
// for each UAM
YarnConfiguration config = new YarnConfiguration(getConf());
FederationProxyProviderUtil.updateConfForFederation(config,
subClusterId.getId());
.submit(() -> {
RegisterApplicationMasterResponse response = null;
try {
// Create a config loaded with federation on and subclusterId
// for each UAM
YarnConfiguration config = new YarnConfiguration(getConf());
FederationProxyProviderUtil.updateConfForFederation(config,
subClusterId.getId());
ApplicationSubmissionContext originalSubmissionContext =
federationFacade.getApplicationSubmissionContext(appId);
ApplicationSubmissionContext originalSubmissionContext =
federationFacade.getApplicationSubmissionContext(appId);
uamPool.reAttachUAM(subClusterId.getId(), config, appId,
amRegistrationResponse.getQueue(),
getApplicationContext().getUser(), homeSubClusterId.getId(),
amrmToken, subClusterId.toString(), originalSubmissionContext);
uamPool.reAttachUAM(subClusterId.getId(), config, appId,
amRegistrationResponse.getQueue(),
getApplicationContext().getUser(), homeSubClusterId.getId(),
amrmToken, subClusterId.toString(), originalSubmissionContext);
secondaryRelayers.put(subClusterId.getId(),
uamPool.getAMRMClientRelayer(subClusterId.getId()));
secondaryRelayers.put(subClusterId.getId(),
uamPool.getAMRMClientRelayer(subClusterId.getId()));
response = uamPool.registerApplicationMaster(
subClusterId.getId(), amRegistrationRequest);
response = uamPool.registerApplicationMaster(subClusterId.getId(),
amRegistrationRequest);
// Set sub-cluster to be timed out initially
lastSCResponseTime.put(subClusterId,
clock.getTime() - subClusterTimeOut);
// Set sub-cluster to be timed out initially
lastSCResponseTime.put(subClusterId, clock.getTime() - subClusterTimeOut);
if (response != null
&& response.getContainersFromPreviousAttempts() != null) {
cacheAllocatedContainers(
response.getContainersFromPreviousAttempts(),
subClusterId);
}
LOG.info("UAM {} reattached for {}", subClusterId, appId);
} catch (Throwable e) {
LOG.error(
"Reattaching UAM " + subClusterId + " failed for " + appId,
e);
if (response != null && response.getContainersFromPreviousAttempts() != null) {
cacheAllocatedContainers(response.getContainersFromPreviousAttempts(),
subClusterId);
}
return response;
LOG.info("UAM {} reattached for {}", subClusterId, appId);
} catch (Throwable e) {
LOG.error("Reattaching UAM {} failed for {}.", subClusterId, appId, e);
}
return response;
});
}
@ -1115,7 +1104,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
private SubClusterId getSubClusterForNode(String nodeName) {
SubClusterId subClusterId = null;
SubClusterId subClusterId;
try {
subClusterId = this.subClusterResolver.getSubClusterForNode(nodeName);
} catch (YarnException e) {
@ -1139,8 +1128,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
private Map<SubClusterId, AllocateRequest> splitAllocateRequest(
AllocateRequest request) throws YarnException {
Map<SubClusterId, AllocateRequest> requestMap =
new HashMap<SubClusterId, AllocateRequest>();
Map<SubClusterId, AllocateRequest> requestMap = new HashMap<>();
// Create heart beat request for home sub-cluster resource manager
findOrCreateAllocateRequestForSubCluster(this.homeSubClusterId, request,
@ -1230,8 +1218,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*
* @param requests contains the heart beat requests to send to the resource
* manager keyed by the sub-cluster id
* @throws YarnException
* @throws IOException
* @throws YarnException exceptions from yarn servers.
* @throws IOException an I/O exception of some sort has occurred.
*/
private void sendRequestsToResourceManagers(
Map<SubClusterId, AllocateRequest> requests)
@ -1255,7 +1243,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
if (subClusterId.equals(this.homeSubClusterId)) {
// Request for the home sub-cluster resource manager
this.homeHeartbeartHandler.allocateAsync(entry.getValue(),
this.homeHeartbeatHandler.allocateAsync(entry.getValue(),
new HeartbeatCallBack(this.homeSubClusterId, false));
} else {
if (!this.uamPool.hasUAMId(subClusterId.getId())) {
@ -1280,7 +1268,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
// list and create and register Unmanaged AM instance for the new ones
List<SubClusterId> newSubClusters = new ArrayList<>();
requests.keySet().stream().forEach(subClusterId -> {
requests.keySet().forEach(subClusterId -> {
String id = subClusterId.getId();
if (!subClusterId.equals(this.homeSubClusterId) && !this.uamPool.hasUAMId(id)) {
newSubClusters.add(subClusterId);
@ -1455,10 +1443,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
List<ContainerStatus> finishedContainers) {
for (ContainerStatus container : finishedContainers) {
LOG.debug("Completed container {}", container);
if (containerIdToSubClusterIdMap
.containsKey(container.getContainerId())) {
containerIdToSubClusterIdMap.remove(container.getContainerId());
}
containerIdToSubClusterIdMap.remove(container.getContainerId());
}
}
@ -1697,7 +1682,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
private static AllocateRequest findOrCreateAllocateRequestForSubCluster(
SubClusterId subClusterId, AllocateRequest originalAMRequest,
Map<SubClusterId, AllocateRequest> requestMap) {
AllocateRequest newRequest = null;
AllocateRequest newRequest;
if (requestMap.containsKey(subClusterId)) {
newRequest = requestMap.get(subClusterId);
} else {
@ -1715,14 +1700,14 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
private static AllocateRequest createAllocateRequest() {
AllocateRequest request =
RECORD_FACTORY.newRecordInstance(AllocateRequest.class);
request.setAskList(new ArrayList<ResourceRequest>());
request.setReleaseList(new ArrayList<ContainerId>());
request.setAskList(new ArrayList<>());
request.setReleaseList(new ArrayList<>());
ResourceBlacklistRequest blackList =
ResourceBlacklistRequest.newInstance(null, null);
blackList.setBlacklistAdditions(new ArrayList<String>());
blackList.setBlacklistRemovals(new ArrayList<String>());
blackList.setBlacklistAdditions(new ArrayList<>());
blackList.setBlacklistRemovals(new ArrayList<>());
request.setResourceBlacklistRequest(blackList);
request.setUpdateRequests(new ArrayList<UpdateContainerRequest>());
request.setUpdateRequests(new ArrayList<>());
return request;
}
@ -1738,9 +1723,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
long duration = this.clock.getTime() - entry.getValue();
if (duration > this.subClusterTimeOut) {
if (verbose) {
LOG.warn(
"Subcluster {} doesn't have a successful heartbeat"
+ " for {} seconds for {}",
LOG.warn("Subcluster {} doesn't have a successful heartbeat for {} seconds for {}",
entry.getKey(), (double) duration / 1000, this.attemptId);
}
timedOutSCs.add(entry.getKey());
@ -1810,8 +1793,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
* Async callback handler for heart beat response from all sub-clusters.
*/
private class HeartbeatCallBack implements AsyncCallback<AllocateResponse> {
private SubClusterId subClusterId;
private boolean isUAM;
private final SubClusterId subClusterId;
private final boolean isUAM;
HeartbeatCallBack(SubClusterId subClusterId, boolean isUAM) {
this.subClusterId = subClusterId;
@ -1823,7 +1806,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
org.apache.hadoop.yarn.api.records.Token amrmToken =
response.getAMRMToken();
synchronized (asyncResponseSink) {
List<AllocateResponse> responses = null;
List<AllocateResponse> responses;
if (asyncResponseSink.containsKey(subClusterId)) {
responses = asyncResponseSink.get(subClusterId);
} else {
@ -1846,8 +1829,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
try {
policyInterpreter.notifyOfResponse(subClusterId, response);
} catch (YarnException e) {
LOG.warn("notifyOfResponse for policy failed for sub-cluster "
+ subClusterId, e);
LOG.warn("notifyOfResponse for policy failed for sub-cluster {}.", subClusterId, e);
}
// Save the new AMRMToken for the UAM if present
@ -1866,11 +1848,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
AMRMTokenIdentifier identifier = new AMRMTokenIdentifier();
identifier.readFields(new DataInputStream(
new ByteArrayInputStream(newToken.getIdentifier())));
LOG.info(
"Received new UAM amrmToken with keyId {} and "
+ "service {} from {} for {}, written to Registry",
identifier.getKeyId(), newToken.getService(), subClusterId,
attemptId);
LOG.info("Received new UAM amrmToken with keyId {} and service {} from {} for {}, " +
"written to Registry", identifier.getKeyId(), newToken.getService(),
subClusterId, attemptId);
} catch (IOException e) {
}
}
@ -1881,7 +1861,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
newToken.encodeToUrlString().getBytes(STRING_TO_BYTE_FORMAT));
} catch (IOException e) {
LOG.error("Error storing UAM token as AMRMProxy "
+ "context entry in NMSS for " + attemptId, e);
+ "context entry in NMSS for {}.", attemptId, e);
}
}
}
@ -1893,8 +1873,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
* FinishApplicationMasterResponse instances.
*/
private static class FinishApplicationMasterResponseInfo {
private FinishApplicationMasterResponse response;
private String subClusterId;
private final FinishApplicationMasterResponse response;
private final String subClusterId;
FinishApplicationMasterResponseInfo(
FinishApplicationMasterResponse response, String subClusterId) {

View File

@ -87,11 +87,11 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
public void init(AMRMProxyApplicationContext applicationContext) {
super.init(applicationContext);
initLocal(applicationContext.getNMCotext().getNodeStatusUpdater()
initLocal(applicationContext.getNMContext().getNodeStatusUpdater()
.getRMIdentifier(),
applicationContext.getApplicationAttemptId(),
applicationContext.getNMCotext().getContainerAllocator(),
applicationContext.getNMCotext().getNMTokenSecretManager(),
applicationContext.getNMContext().getContainerAllocator(),
applicationContext.getNMContext().getNMTokenSecretManager(),
applicationContext.getUser());
}

View File

@ -82,7 +82,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorCompletionService;
@ -103,8 +102,7 @@ public abstract class BaseAMRMProxyTest {
private MockAMRMProxyService amrmProxyService;
// Thread pool used for asynchronous operations
private static ExecutorService threadpool = Executors
.newCachedThreadPool();
private final ExecutorService threadpool = Executors.newCachedThreadPool();
private Configuration conf;
private AsyncDispatcher dispatcher;
private Context nmContext;
@ -230,47 +228,37 @@ public abstract class BaseAMRMProxyTest {
protected <T, R> List<R> runInParallel(List<T> testContexts,
final Function<T, R> func) {
ExecutorCompletionService<R> completionService =
new ExecutorCompletionService<R>(this.getThreadPool());
LOG.info("Sending requests to endpoints asynchronously. Number of test contexts="
+ testContexts.size());
for (int index = 0; index < testContexts.size(); index++) {
final T testContext = testContexts.get(index);
new ExecutorCompletionService<>(this.getThreadPool());
LOG.info("Sending requests to endpoints asynchronously. Number of test contexts = {}.",
testContexts.size());
for (final T testContext : testContexts) {
LOG.info("Adding request to threadpool for test context: {}.", testContext.toString());
LOG.info("Adding request to threadpool for test context: "
+ testContext.toString());
completionService.submit(() -> {
LOG.info("Sending request. Test context: {}.", testContext);
completionService.submit(new Callable<R>() {
@Override
public R call() throws Exception {
LOG.info("Sending request. Test context:"
+ testContext.toString());
R response = null;
try {
response = func.invoke(testContext);
LOG.info("Successfully sent request for context: "
+ testContext.toString());
} catch (Throwable ex) {
LOG.error("Failed to process request for context: "
+ testContext);
response = null;
}
return response;
R response;
try {
response = func.invoke(testContext);
LOG.info("Successfully sent request for context: {}.", testContext);
} catch (Throwable ex) {
LOG.error("Failed to process request for context: {}.", testContext);
response = null;
}
return response;
});
}
ArrayList<R> responseList = new ArrayList<R>();
LOG.info("Waiting for responses from endpoints. Number of contexts="
+ testContexts.size());
ArrayList<R> responseList = new ArrayList<>();
LOG.info("Waiting for responses from endpoints. Number of contexts = {}.", testContexts.size());
for (int i = 0; i < testContexts.size(); ++i) {
try {
final Future<R> future = completionService.take();
final R response = future.get(3000, TimeUnit.MILLISECONDS);
responseList.add(response);
} catch (Throwable e) {
LOG.error("Failed to process request " + e.getMessage());
LOG.error("Failed to process request {}", e.getMessage());
}
}
@ -291,29 +279,19 @@ public abstract class BaseAMRMProxyTest {
final int testAppId) throws Exception, YarnException, IOException {
final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId);
return ugi
.getUser()
.doAs(
new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() {
@Override
public RegisterApplicationMasterResponse run()
throws Exception {
getAMRMProxyService().initApp(
ugi.getAppAttemptId(),
ugi.getUser().getUserName());
return ugi.getUser().doAs((PrivilegedExceptionAction<RegisterApplicationMasterResponse>) () -> {
getAMRMProxyService().initApp(ugi.getAppAttemptId(), ugi.getUser().getUserName());
final RegisterApplicationMasterRequest req =
Records
.newRecord(RegisterApplicationMasterRequest.class);
req.setHost(Integer.toString(testAppId));
req.setRpcPort(testAppId);
req.setTrackingUrl("");
final RegisterApplicationMasterRequest req =
Records.newRecord(RegisterApplicationMasterRequest.class);
req.setHost(Integer.toString(testAppId));
req.setRpcPort(testAppId);
req.setTrackingUrl("");
RegisterApplicationMasterResponse response =
getAMRMProxyService().registerApplicationMaster(req);
return response;
}
});
RegisterApplicationMasterResponse response =
getAMRMProxyService().registerApplicationMaster(req);
return response;
});
}
/**
@ -327,37 +305,30 @@ public abstract class BaseAMRMProxyTest {
final ArrayList<T> testContexts) {
List<RegisterApplicationMasterResponseInfo<T>> responses =
runInParallel(testContexts,
new Function<T, RegisterApplicationMasterResponseInfo<T>>() {
@Override
public RegisterApplicationMasterResponseInfo<T> invoke(
T testContext) {
RegisterApplicationMasterResponseInfo<T> response = null;
try {
int index = testContexts.indexOf(testContext);
response =
new RegisterApplicationMasterResponseInfo<T>(
registerApplicationMaster(index), testContext);
Assert.assertNotNull(response.getResponse());
Assert.assertEquals(Integer.toString(index), response
.getResponse().getQueue());
testContext -> {
RegisterApplicationMasterResponseInfo<T> response;
try {
int index = testContexts.indexOf(testContext);
response = new RegisterApplicationMasterResponseInfo<>(
registerApplicationMaster(index), testContext);
Assert.assertNotNull(response.getResponse());
Assert.assertEquals(Integer.toString(index), response.getResponse().getQueue());
LOG.info("Successfully registered application master with test context: "
+ testContext);
} catch (Throwable ex) {
response = null;
LOG.error("Failed to register application master with test context: "
+ testContext);
}
LOG.info("Successfully registered application master with test context: {}.",
testContext);
} catch (Throwable ex) {
response = null;
LOG.error("Failed to register application master with test context: {}.",
testContext);
}
return response;
}
});
return response;
});
Assert.assertEquals(
"Number of responses received does not match with request",
Assert.assertEquals("Number of responses received does not match with request",
testContexts.size(), responses.size());
Set<T> contextResponses = new TreeSet<T>();
Set<T> contextResponses = new TreeSet<>();
for (RegisterApplicationMasterResponseInfo<T> item : responses) {
contextResponses.add(item.getTestContext());
}
@ -410,37 +381,28 @@ public abstract class BaseAMRMProxyTest {
final ArrayList<T> testContexts) {
List<FinishApplicationMasterResponseInfo<T>> responses =
runInParallel(testContexts,
new Function<T, FinishApplicationMasterResponseInfo<T>>() {
@Override
public FinishApplicationMasterResponseInfo<T> invoke(
T testContext) {
FinishApplicationMasterResponseInfo<T> response = null;
try {
response =
new FinishApplicationMasterResponseInfo<T>(
finishApplicationMaster(
testContexts.indexOf(testContext),
FinalApplicationStatus.SUCCEEDED),
testContext);
Assert.assertNotNull(response.getResponse());
testContext -> {
FinishApplicationMasterResponseInfo<T> response;
try {
response = new FinishApplicationMasterResponseInfo<>(
finishApplicationMaster(testContexts.indexOf(testContext),
FinalApplicationStatus.SUCCEEDED), testContext);
Assert.assertNotNull(response.getResponse());
LOG.info("Successfully finished application master with test contexts: "
+ testContext);
} catch (Throwable ex) {
response = null;
LOG.error("Failed to finish application master with test context: "
+ testContext);
}
return response;
LOG.info("Successfully finished application master with test contexts: {}.",
testContext);
} catch (Throwable ex) {
response = null;
LOG.error("Failed to finish application master with test context: {}.",
testContext);
}
return response;
});
Assert.assertEquals(
"Number of responses received does not match with request",
Assert.assertEquals("Number of responses received does not match with request",
testContexts.size(), responses.size());
Set<T> contextResponses = new TreeSet<T>();
Set<T> contextResponses = new TreeSet<>();
for (FinishApplicationMasterResponseInfo<T> item : responses) {
Assert.assertNotNull(item);
Assert.assertNotNull(item.getResponse());
@ -455,27 +417,19 @@ public abstract class BaseAMRMProxyTest {
}
protected AllocateResponse allocate(final int testAppId)
throws Exception, YarnException, IOException {
throws Exception {
final AllocateRequest req = Records.newRecord(AllocateRequest.class);
req.setResponseId(testAppId);
return allocate(testAppId, req);
}
protected AllocateResponse allocate(final int testAppId,
final AllocateRequest request) throws Exception, YarnException,
IOException {
protected AllocateResponse allocate(final int testAppId, final AllocateRequest request)
throws Exception {
final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId);
return ugi.getUser().doAs(
new PrivilegedExceptionAction<AllocateResponse>() {
@Override
public AllocateResponse run() throws Exception {
AllocateResponse response =
getAMRMProxyService().allocate(request);
return response;
}
});
return ugi.getUser().doAs((PrivilegedExceptionAction<AllocateResponse>)
() -> getAMRMProxyService().allocate(request));
}
protected ApplicationUserInfo getApplicationUserInfo(final int testAppId) {
@ -490,44 +444,37 @@ public abstract class BaseAMRMProxyTest {
}
protected List<ResourceRequest> createResourceRequests(String[] hosts,
int memory, int vCores, int priority, int containers)
throws Exception {
int memory, int vCores, int priority, int containers) {
return createResourceRequests(hosts, memory, vCores, priority,
containers, null);
}
protected List<ResourceRequest> createResourceRequests(String[] hosts,
int memory, int vCores, int priority, int containers,
String labelExpression) throws Exception {
List<ResourceRequest> reqs = new ArrayList<ResourceRequest>();
int memory, int vCores, int priority, int containers, String labelExpression) {
List<ResourceRequest> reqs = new ArrayList<>();
for (String host : hosts) {
ResourceRequest hostReq =
createResourceRequest(host, memory, vCores, priority,
containers, labelExpression);
ResourceRequest hostReq = createResourceRequest(host, memory, vCores, priority,
containers, labelExpression);
reqs.add(hostReq);
ResourceRequest rackReq =
createResourceRequest("/default-rack", memory, vCores, priority,
containers, labelExpression);
ResourceRequest rackReq = createResourceRequest("/default-rack", memory, vCores, priority,
containers, labelExpression);
reqs.add(rackReq);
}
ResourceRequest offRackReq =
createResourceRequest(ResourceRequest.ANY, memory, vCores,
priority, containers, labelExpression);
ResourceRequest offRackReq = createResourceRequest(ResourceRequest.ANY, memory, vCores,
priority, containers, labelExpression);
reqs.add(offRackReq);
return reqs;
}
protected ResourceRequest createResourceRequest(String resource,
int memory, int vCores, int priority, int containers)
throws Exception {
return createResourceRequest(resource, memory, vCores, priority,
containers, null);
int memory, int vCores, int priority, int containers) {
return createResourceRequest(resource, memory, vCores, priority, containers, null);
}
protected ResourceRequest createResourceRequest(String resource,
int memory, int vCores, int priority, int containers,
String labelExpression) throws Exception {
String labelExpression) {
ResourceRequest req = Records.newRecord(ResourceRequest.class);
req.setResourceName(resource);
req.setNumContainers(containers);
@ -548,8 +495,8 @@ public abstract class BaseAMRMProxyTest {
/**
* Returns an ApplicationId with the specified identifier
*
* @param testAppId
* @return
* @param testAppId testApplication.
* @return ApplicationId.
*/
protected ApplicationId getApplicationId(int testAppId) {
return ApplicationId.newInstance(123456, testAppId);
@ -559,8 +506,8 @@ public abstract class BaseAMRMProxyTest {
* Return an instance of ApplicationAttemptId using specified identifier. This
* identifier will be used for the ApplicationId too.
*
* @param testAppId
* @return
* @param testAppId testApplicationId.
* @return ApplicationAttemptId.
*/
protected ApplicationAttemptId getApplicationAttemptId(int testAppId) {
return ApplicationAttemptId.newInstance(getApplicationId(testAppId),
@ -571,8 +518,8 @@ public abstract class BaseAMRMProxyTest {
* Return an instance of ApplicationAttemptId using specified identifier and
* application id
*
* @param testAppId
* @return
* @param testAppId testApplicationId.
* @return ApplicationAttemptId.
*/
protected ApplicationAttemptId getApplicationAttemptId(int testAppId,
ApplicationId appId) {
@ -580,8 +527,8 @@ public abstract class BaseAMRMProxyTest {
}
protected static class RegisterApplicationMasterResponseInfo<T> {
private RegisterApplicationMasterResponse response;
private T testContext;
private final RegisterApplicationMasterResponse response;
private final T testContext;
RegisterApplicationMasterResponseInfo(
RegisterApplicationMasterResponse response, T testContext) {
@ -599,8 +546,8 @@ public abstract class BaseAMRMProxyTest {
}
protected static class FinishApplicationMasterResponseInfo<T> {
private FinishApplicationMasterResponse response;
private T testContext;
private final FinishApplicationMasterResponse response;
private final T testContext;
FinishApplicationMasterResponseInfo(
FinishApplicationMasterResponse response, T testContext) {
@ -618,8 +565,8 @@ public abstract class BaseAMRMProxyTest {
}
protected static class ApplicationUserInfo {
private UserGroupInformation user;
private ApplicationAttemptId attemptId;
private final UserGroupInformation user;
private final ApplicationAttemptId attemptId;
ApplicationUserInfo(UserGroupInformation user,
ApplicationAttemptId attemptId) {
@ -654,12 +601,12 @@ public abstract class BaseAMRMProxyTest {
* actual service, the initialization is called by the
* ContainerManagerImpl::StartContainers method
*
* @param applicationId
* @param user
* @param applicationId ApplicationAttemptId
* @param user username.
*/
public void initApp(ApplicationAttemptId applicationId, String user) {
super.initializePipeline(applicationId, user,
new Token<AMRMTokenIdentifier>(), null, null, false, null);
new Token<>(), null, null, false, null);
}
public void stopApp(ApplicationId applicationId) {
@ -672,7 +619,7 @@ public abstract class BaseAMRMProxyTest {
* invoked asynchronously at a later point.
*/
protected interface Function<T, R> {
public R invoke(T input);
R invoke(T input);
}
protected class NullContext implements Context {

View File

@ -86,7 +86,7 @@ public class TestAMRMProxyMetrics extends BaseAMRMProxyTest {
finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
Assert.assertTrue(finishResponse.getIsUnregistered());
Assert.assertEquals(failedAppStartRequests, metrics.getFailedAppStartRequests());
Assert.assertEquals(failedRegisterAMRequests, metrics.getFailedRegisterAMRequests());

View File

@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
@ -58,6 +57,8 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
/**
* Test if the pipeline is created properly.
*
* @throws Exception There was an error registerApplicationMaster.
*/
@Test
public void testRequestInterceptorChainCreation() throws Exception {
@ -73,8 +74,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
root.getClass().getName());
break;
case 3:
Assert.assertEquals(MockRequestInterceptor.class.getName(), root
.getClass().getName());
Assert.assertEquals(MockRequestInterceptor.class.getName(), root.getClass().getName());
break;
}
@ -82,8 +82,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
index++;
}
Assert.assertEquals(
"The number of interceptors in chain does not match",
Assert.assertEquals("The number of interceptors in chain does not match",
Integer.toString(4), Integer.toString(index));
}
@ -91,7 +90,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
/**
* Tests registration of a single application master.
*
* @throws Exception
* @throws Exception There was an error registerApplicationMaster.
*/
@Test
public void testRegisterOneApplicationMaster() throws Exception {
@ -99,8 +98,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
// manager return it as the queue name. Assert that we received the queue
// name
int testAppId = 1;
RegisterApplicationMasterResponse response1 =
registerApplicationMaster(testAppId);
RegisterApplicationMasterResponse response1 = registerApplicationMaster(testAppId);
Assert.assertNotNull(response1);
Assert.assertEquals(Integer.toString(testAppId), response1.getQueue());
}
@ -108,7 +106,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
/**
* Tests the case when interceptor pipeline initialization fails.
*
* @throws IOException
* @throws IOException There was an error registerApplicationMaster.
*/
@Test
public void testInterceptorInitFailure() throws IOException {
@ -127,9 +125,8 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
Map<ApplicationId, RequestInterceptorChainWrapper> pipelines =
getAMRMProxyService().getPipelines();
ApplicationId id = getApplicationId(testAppId);
Assert.assertTrue(
"The interceptor pipeline should be removed if initialization fails",
pipelines.get(id) == null);
Assert.assertNull("The interceptor pipeline should be removed if initialization fails",
pipelines.get(id));
}
}
@ -137,28 +134,24 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
* Tests the registration of multiple application master serially one at a
* time.
*
* @throws Exception
* @throws Exception There was an error registerApplicationMaster.
*/
@Test
public void testRegisterMultipleApplicationMasters() throws Exception {
for (int testAppId = 0; testAppId < 3; testAppId++) {
RegisterApplicationMasterResponse response =
registerApplicationMaster(testAppId);
RegisterApplicationMasterResponse response = registerApplicationMaster(testAppId);
Assert.assertNotNull(response);
Assert
.assertEquals(Integer.toString(testAppId), response.getQueue());
Assert.assertEquals(Integer.toString(testAppId), response.getQueue());
}
}
/**
* Tests the registration of multiple application masters using multiple
* threads in parallel.
*
* @throws Exception
*
*/
@Test
public void testRegisterMultipleApplicationMastersInParallel()
throws Exception {
public void testRegisterMultipleApplicationMastersInParallel() {
int numberOfRequests = 5;
ArrayList<String> testContexts =
CreateTestRequestIdentifiers(numberOfRequests);
@ -167,10 +160,10 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
private ArrayList<String> CreateTestRequestIdentifiers(
int numberOfRequests) {
ArrayList<String> testContexts = new ArrayList<String>();
ArrayList<String> testContexts = new ArrayList<>();
LOG.info("Creating " + numberOfRequests + " contexts for testing");
for (int ep = 0; ep < numberOfRequests; ep++) {
testContexts.add("test-endpoint-" + Integer.toString(ep));
testContexts.add("test-endpoint-" + ep);
LOG.info("Created test context: " + testContexts.get(ep));
}
return testContexts;
@ -190,7 +183,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
Assert.assertTrue(finishResponse.getIsUnregistered());
}
@Test
@ -219,7 +212,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
}
@Test
public void testFinishInvalidApplicationMaster() throws Exception {
public void testFinishInvalidApplicationMaster() {
try {
// Try to finish an application master that was not registered.
finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED);
@ -248,11 +241,10 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
finishApplicationMaster(index, FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
Assert.assertTrue(finishResponse.getIsUnregistered());
// Assert that the application has been removed from the collection
Assert.assertTrue(this.getAMRMProxyService()
.getPipelines().size() == index);
Assert.assertEquals(this.getAMRMProxyService().getPipelines().size(), index);
}
try {
@ -280,10 +272,10 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
public void testFinishMultipleApplicationMastersInParallel()
throws Exception {
int numberOfRequests = 5;
ArrayList<String> testContexts = new ArrayList<String>();
LOG.info("Creating " + numberOfRequests + " contexts for testing");
ArrayList<String> testContexts = new ArrayList<>();
LOG.info("Creating {} contexts for testing", numberOfRequests);
for (int i = 0; i < numberOfRequests; i++) {
testContexts.add("test-endpoint-" + Integer.toString(i));
testContexts.add("test-endpoint-" + i);
LOG.info("Created test context: " + testContexts.get(i));
RegisterApplicationMasterResponse registerResponse =
@ -313,11 +305,11 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
Assert.assertTrue(finishResponse.getIsUnregistered());
}
@Test
public void testAllocateRequestWithoutRegistering() throws Exception {
public void testAllocateRequestWithoutRegistering() {
try {
// Try to allocate an application master without registering.
@ -381,51 +373,41 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
public void testAllocateAndReleaseContainersForMultipleAMInParallel()
throws Exception {
int numberOfApps = 6;
ArrayList<Integer> tempAppIds = new ArrayList<Integer>();
ArrayList<Integer> tempAppIds = new ArrayList<>();
for (int i = 0; i < numberOfApps; i++) {
tempAppIds.add(new Integer(i));
tempAppIds.add(i);
}
final ArrayList<Integer> appIds = tempAppIds;
List<Integer> responses =
runInParallel(appIds, new Function<Integer, Integer>() {
@Override
public Integer invoke(Integer testAppId) {
try {
RegisterApplicationMasterResponse registerResponse =
registerApplicationMaster(testAppId);
Assert.assertNotNull("response is null", registerResponse);
List<Container> containers =
getContainersAndAssert(testAppId, 10);
releaseContainersAndAssert(testAppId, containers);
runInParallel(tempAppIds, testAppId -> {
try {
RegisterApplicationMasterResponse registerResponse =
registerApplicationMaster(testAppId);
Assert.assertNotNull("response is null", registerResponse);
List<Container> containers =
getContainersAndAssert(testAppId, 10);
releaseContainersAndAssert(testAppId, containers);
LOG.info("Successfully registered application master with appId: "
+ testAppId);
} catch (Throwable ex) {
LOG.error(
"Failed to register application master with appId: "
+ testAppId, ex);
testAppId = null;
}
return testAppId;
LOG.info("Successfully registered application master with appId: {}", testAppId);
} catch (Throwable ex) {
LOG.error("Failed to register application master with appId: {}", testAppId, ex);
testAppId = null;
}
return testAppId;
});
Assert.assertEquals(
"Number of responses received does not match with request",
appIds.size(), responses.size());
Assert.assertEquals("Number of responses received does not match with request",
tempAppIds.size(), responses.size());
for (Integer testAppId : responses) {
Assert.assertNotNull(testAppId);
finishApplicationMaster(testAppId.intValue(),
FinalApplicationStatus.SUCCEEDED);
finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
}
}
@Test
public void testMultipleAttemptsSameNode()
throws YarnException, IOException, Exception {
public void testMultipleAttemptsSameNode() throws Exception {
String user = "hadoop";
ApplicationId appId = ApplicationId.newInstance(1, 1);
@ -444,7 +426,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
applicationAttemptId = ApplicationAttemptId.newInstance(appId, 2);
getAMRMProxyService().initializePipeline(applicationAttemptId, user,
new Token<AMRMTokenIdentifier>(), null, null, false, null);
new Token<>(), null, null, false, null);
RequestInterceptorChainWrapper chain2 =
getAMRMProxyService().getPipelines().get(appId);
@ -461,13 +443,10 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
allocateRequest.setResponseId(1);
List<Container> containers =
new ArrayList<Container>(numberOfResourceRequests);
List<ResourceRequest> askList =
new ArrayList<ResourceRequest>(numberOfResourceRequests);
new ArrayList<>(numberOfResourceRequests);
List<ResourceRequest> askList = new ArrayList<>(numberOfResourceRequests);
for (int testAppId = 0; testAppId < numberOfResourceRequests; testAppId++) {
askList.add(createResourceRequest(
"test-node-" + Integer.toString(testAppId), 6000, 2,
testAppId % 5, 1));
askList.add(createResourceRequest("test-node-" + testAppId, 6000, 2, testAppId % 5, 1));
}
allocateRequest.setAskList(askList);
@ -495,11 +474,9 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
containers.addAll(allocateResponse.getAllocatedContainers());
LOG.info("Number of allocated containers in this request: "
+ Integer.toString(allocateResponse.getAllocatedContainers()
.size()));
LOG.info("Total number of allocated containers: "
+ Integer.toString(containers.size()));
LOG.info("Number of allocated containers in this request: {}.",
allocateResponse.getAllocatedContainers().size());
LOG.info("Total number of allocated containers: {}.", containers.size());
Thread.sleep(10);
}
@ -517,8 +494,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(1);
List<ContainerId> relList =
new ArrayList<ContainerId>(containers.size());
List<ContainerId> relList = new ArrayList<>(containers.size());
for (Container container : containers) {
relList.add(container.getId());
}
@ -556,23 +532,21 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished);
LOG.info("Number of containers received in this request: "
+ Integer.toString(allocateResponse.getAllocatedContainers()
.size()));
LOG.info("Total number of containers received: "
+ Integer.toString(containersForReleasedContainerIds.size()));
LOG.info("Number of containers received in this request: {}.",
allocateResponse.getAllocatedContainers().size());
LOG.info("Total number of containers received: {}.",
containersForReleasedContainerIds.size());
Thread.sleep(10);
}
Assert.assertEquals(relList.size(),
containersForReleasedContainerIds.size());
Assert.assertEquals(relList.size(), containersForReleasedContainerIds.size());
}
/**
* Test AMRMProxy restart with recovery.
*/
@Test
public void testRecovery() throws YarnException, Exception {
public void testRecovery() throws Exception {
Configuration conf = createConfiguration();
// Use the MockRequestInterceptorAcrossRestart instead for the chain
@ -602,7 +576,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
// At the time of kill, app1 just registerAM, app2 already did one allocate.
// Both application should be recovered
createAndStartAMRMProxyService(conf);
Assert.assertTrue(getAMRMProxyService().getPipelines().size() == 2);
Assert.assertEquals(2, getAMRMProxyService().getPipelines().size());
allocateResponse = allocate(testAppId1);
Assert.assertNotNull(allocateResponse);
@ -610,7 +584,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
FinishApplicationMasterResponse finishResponse =
finishApplicationMaster(testAppId1, FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
Assert.assertTrue(finishResponse.getIsUnregistered());
allocateResponse = allocate(testAppId2);
Assert.assertNotNull(allocateResponse);
@ -619,7 +593,7 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
finishApplicationMaster(testAppId2, FinalApplicationStatus.SUCCEEDED);
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
Assert.assertTrue(finishResponse.getIsUnregistered());
int testAppId3 = 3;
try {
@ -664,23 +638,21 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
@Test
public void testCheckIfAppExistsInStateStore()
throws IOException, YarnException {
throws IOException {
ApplicationId appId = ApplicationId.newInstance(0, 0);
Configuration conf = createConfiguration();
conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
createAndStartAMRMProxyService(conf);
Assert.assertEquals(false,
getAMRMProxyService().checkIfAppExistsInStateStore(appId));
Assert.assertFalse(getAMRMProxyService().checkIfAppExistsInStateStore(appId));
Configuration distConf = createConfiguration();
conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true);
createAndStartAMRMProxyService(distConf);
Assert.assertEquals(true,
getAMRMProxyService().checkIfAppExistsInStateStore(appId));
Assert.assertTrue(getAMRMProxyService().checkIfAppExistsInStateStore(appId));
}
/**

View File

@ -203,7 +203,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
List<Container> containers = new ArrayList<>(numberOfResourceRequests);
List<ResourceRequest> askList = new ArrayList<>(numberOfResourceRequests);
for (int id = 0; id < numberOfResourceRequests; id++) {
askList.add(createResourceRequest("test-node-" + Integer.toString(id),
askList.add(createResourceRequest("test-node-" + id,
6000, 2, id % 5, 1));
}
@ -217,7 +217,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
containers.addAll(allocateResponse.getAllocatedContainers());
LOG.info("Number of allocated containers in the original request: "
+ Integer.toString(allocateResponse.getAllocatedContainers().size()));
+ allocateResponse.getAllocatedContainers().size());
// Send max 10 heart beats to receive all the containers. If not, we will
// fail the test
@ -236,10 +236,9 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
interceptor.drainAllAsyncQueue(false);
containers.addAll(allocateResponse.getAllocatedContainers());
LOG.info("Number of allocated containers in this request: "
+ Integer.toString(allocateResponse.getAllocatedContainers().size()));
LOG.info("Total number of allocated containers: "
+ Integer.toString(containers.size()));
LOG.info("Number of allocated containers in this request: {}.",
allocateResponse.getAllocatedContainers().size());
LOG.info("Total number of allocated containers: {}.", containers.size());
Thread.sleep(10);
}
Assert.assertEquals(numberOfAllocationExcepted, containers.size());
@ -250,7 +249,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
throws Exception {
Assert.assertTrue(containers.size() > 0);
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
List<ContainerId> relList = new ArrayList<ContainerId>(containers.size());
List<ContainerId> relList = new ArrayList<>(containers.size());
for (Container container : containers) {
relList.add(container.getId());
}
@ -267,8 +266,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
// The release containers returned by the mock resource managers will be
// aggregated and returned back to us, and we can check if total request size
// and returned size are the same
List<ContainerId> containersForReleasedContainerIds =
new ArrayList<ContainerId>();
List<ContainerId> containersForReleasedContainerIds = new ArrayList<>();
List<ContainerId> newlyFinished = getCompletedContainerIds(
allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished);
@ -306,7 +304,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
private void checkAMRMToken(Token amrmToken) {
if (amrmToken != null) {
// The token should be the one issued by home MockRM
Assert.assertTrue(amrmToken.getKind().equals(Integer.toString(0)));
Assert.assertEquals(Integer.toString(0), amrmToken.getKind());
}
}
@ -314,69 +312,66 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
public void testMultipleSubClusters() throws Exception {
UserGroupInformation ugi =
interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
// Allocate the first batch of containers, with sc1 and sc2 active
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance("SC-2"));
// Allocate the first batch of containers, with sc1 and sc2 active
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance("SC-2"));
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(2, interceptor.getUnmanagedAMPoolSize());
// Allocate the second batch of containers, with sc1 and sc3 active
deRegisterSubCluster(SubClusterId.newInstance("SC-2"));
registerSubCluster(SubClusterId.newInstance("SC-3"));
// Allocate the second batch of containers, with sc1 and sc3 active
deRegisterSubCluster(SubClusterId.newInstance("SC-2"));
registerSubCluster(SubClusterId.newInstance("SC-3"));
numberOfContainers = 1;
containers.addAll(
getContainersAndAssert(numberOfContainers, numberOfContainers * 2));
Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
numberOfContainers = 1;
containers.addAll(
getContainersAndAssert(numberOfContainers, numberOfContainers * 2));
Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
// Allocate the third batch of containers with only in home sub-cluster
// active
deRegisterSubCluster(SubClusterId.newInstance("SC-1"));
deRegisterSubCluster(SubClusterId.newInstance("SC-3"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
// Allocate the third batch of containers with only in home sub-cluster
// active
deRegisterSubCluster(SubClusterId.newInstance("SC-1"));
deRegisterSubCluster(SubClusterId.newInstance("SC-3"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
numberOfContainers = 2;
containers.addAll(
getContainersAndAssert(numberOfContainers, numberOfContainers * 1));
Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
numberOfContainers = 2;
containers.addAll(
getContainersAndAssert(numberOfContainers, numberOfContainers));
Assert.assertEquals(3, interceptor.getUnmanagedAMPoolSize());
// Release all containers
releaseContainersAndAssert(containers);
// Release all containers
releaseContainersAndAssert(containers);
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finishResponse);
Assert.assertTrue(finishResponse.getIsUnregistered());
return null;
}
return null;
});
}
@ -387,55 +382,52 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
public void testReregister() throws Exception {
UserGroupInformation ugi =
interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
// Allocate the first batch of containers
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
// Allocate the first batch of containers
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
interceptor.setShouldReRegisterNext();
interceptor.setShouldReRegisterNext();
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
interceptor.setShouldReRegisterNext();
interceptor.setShouldReRegisterNext();
// Release all containers
releaseContainersAndAssert(containers);
// Release all containers
releaseContainersAndAssert(containers);
interceptor.setShouldReRegisterNext();
interceptor.setShouldReRegisterNext();
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finishResponse);
Assert.assertTrue(finishResponse.getIsUnregistered());
return null;
}
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finishResponse);
Assert.assertTrue(finishResponse.getIsUnregistered());
return null;
});
}
@ -498,7 +490,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
implements Callable<RegisterApplicationMasterResponse> {
@Override
public RegisterApplicationMasterResponse call() throws Exception {
RegisterApplicationMasterResponse response = null;
RegisterApplicationMasterResponse response;
try {
// Use port number 1001 to let mock RM block in the register call
response = interceptor.registerApplicationMaster(
@ -536,110 +528,107 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
throws Exception {
UserGroupInformation ugi =
interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
interceptor = new TestableFederationInterceptor();
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext,
getConf(), attemptId, "test-user", null, null, null, registryObj));
interceptor.cleanupRegistry();
ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
interceptor = new TestableFederationInterceptor();
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext,
getConf(), attemptId, "test-user", null, null, null, registryObj));
interceptor.cleanupRegistry();
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId);
registerReq.setTrackingUrl("");
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
// Allocate one batch of containers
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
// Allocate one batch of containers
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// Make sure all async hb threads are done
interceptor.drainAllAsyncQueue(true);
// Make sure all async hb threads are done
interceptor.drainAllAsyncQueue(true);
// Prepare for Federation Interceptor restart and recover
Map<String, byte[]> recoveredDataMap =
recoverDataMapForAppAttempt(nmStateStore, attemptId);
String scEntry =
FederationInterceptor.NMSS_SECONDARY_SC_PREFIX + "SC-1";
if (registryObj == null) {
Assert.assertTrue(recoveredDataMap.containsKey(scEntry));
} else {
// When AMRMPRoxy HA is enabled, NMSS should not have the UAM token,
// it should be in Registry
Assert.assertFalse(recoveredDataMap.containsKey(scEntry));
}
// Preserve the mock RM instances
MockResourceManagerFacade homeRM = interceptor.getHomeRM();
ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
interceptor.getSecondaryRMs();
// Create a new interceptor instance and recover
interceptor = new TestableFederationInterceptor(homeRM, secondaries);
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext,
getConf(), attemptId, "test-user", null, null, null, registryObj));
interceptor.recover(recoveredDataMap);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// SC1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
// The first allocate call expects a fail-over exception and re-register
try {
AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(lastResponseId);
AllocateResponse allocateResponse =
interceptor.allocate(allocateRequest);
lastResponseId = allocateResponse.getResponseId();
Assert.fail("Expecting an ApplicationMasterNotRegisteredException "
+ " after FederationInterceptor restarts and recovers");
} catch (ApplicationMasterNotRegisteredException e) {
}
interceptor.registerApplicationMaster(registerReq);
lastResponseId = 0;
// Release all containers
releaseContainersAndAssert(containers);
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finishResponse);
Assert.assertTrue(finishResponse.getIsUnregistered());
// After the application succeeds, the registry/NMSS entry should be
// cleaned up
if (registryObj != null) {
Assert.assertEquals(0,
interceptor.getRegistryClient().getAllApplications().size());
} else {
recoveredDataMap =
recoverDataMapForAppAttempt(nmStateStore, attemptId);
Assert.assertFalse(recoveredDataMap.containsKey(scEntry));
}
return null;
// Prepare for Federation Interceptor restart and recover
Map<String, byte[]> recoveredDataMap =
recoverDataMapForAppAttempt(nmStateStore, attemptId);
String scEntry =
FederationInterceptor.NMSS_SECONDARY_SC_PREFIX + "SC-1";
if (registryObj == null) {
Assert.assertTrue(recoveredDataMap.containsKey(scEntry));
} else {
// When AMRMPRoxy HA is enabled, NMSS should not have the UAM token,
// it should be in Registry
Assert.assertFalse(recoveredDataMap.containsKey(scEntry));
}
// Preserve the mock RM instances
MockResourceManagerFacade homeRM = interceptor.getHomeRM();
ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
interceptor.getSecondaryRMs();
// Create a new interceptor instance and recover
interceptor = new TestableFederationInterceptor(homeRM, secondaries);
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext,
getConf(), attemptId, "test-user", null, null, null, registryObj));
interceptor.recover(recoveredDataMap);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// SC1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
// The first allocate call expects a fail-over exception and re-register
try {
AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(lastResponseId);
AllocateResponse allocateResponse =
interceptor.allocate(allocateRequest);
lastResponseId = allocateResponse.getResponseId();
Assert.fail("Expecting an ApplicationMasterNotRegisteredException "
+ " after FederationInterceptor restarts and recovers");
} catch (ApplicationMasterNotRegisteredException e) {
}
interceptor.registerApplicationMaster(registerReq);
lastResponseId = 0;
// Release all containers
releaseContainersAndAssert(containers);
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finishResponse);
Assert.assertTrue(finishResponse.getIsUnregistered());
// After the application succeeds, the registry/NMSS entry should be
// cleaned up
if (registryObj != null) {
Assert.assertEquals(0,
interceptor.getRegistryClient().getAllApplications().size());
} else {
recoveredDataMap =
recoverDataMapForAppAttempt(nmStateStore, attemptId);
Assert.assertFalse(recoveredDataMap.containsKey(scEntry));
}
return null;
});
}
@ -774,53 +763,48 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
public void testSubClusterTimeOut() throws Exception {
UserGroupInformation ugi =
interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// Register the application first time
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
// Register the application first time
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance("SC-1"));
getContainersAndAssert(1, 1);
getContainersAndAssert(1, 1);
AllocateResponse allocateResponse =
interceptor.generateBaseAllocationResponse();
Assert.assertEquals(2, allocateResponse.getNumClusterNodes());
Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size());
AllocateResponse allocateResponse = interceptor.generateBaseAllocationResponse();
Assert.assertEquals(2, allocateResponse.getNumClusterNodes());
Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size());
// Let all SC timeout (home and SC-1), without an allocate from AM
Thread.sleep(800);
// Let all SC timeout (home and SC-1), without an allocate from AM
Thread.sleep(800);
// Should not be considered timeout, because there's no recent AM
// heartbeat
allocateResponse = interceptor.generateBaseAllocationResponse();
Assert.assertEquals(2, allocateResponse.getNumClusterNodes());
Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size());
// Should not be considered timeout, because there's no recent AM
// heartbeat
allocateResponse = interceptor.generateBaseAllocationResponse();
Assert.assertEquals(2, allocateResponse.getNumClusterNodes());
Assert.assertEquals(0, interceptor.getTimedOutSCs(true).size());
// Generate a duplicate heartbeat from AM, so that it won't really
// trigger a heartbeat to all SC
AllocateRequest allocateRequest =
Records.newRecord(AllocateRequest.class);
// Set to lastResponseId - 1 so that it will be considered a duplicate
// heartbeat and thus not forwarded to all SCs
allocateRequest.setResponseId(lastResponseId - 1);
interceptor.allocate(allocateRequest);
// Generate a duplicate heartbeat from AM, so that it won't really
// trigger a heartbeat to all SC
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
// Set to lastResponseId - 1 so that it will be considered a duplicate
// heartbeat and thus not forwarded to all SCs
allocateRequest.setResponseId(lastResponseId - 1);
interceptor.allocate(allocateRequest);
// Should be considered timeout
allocateResponse = interceptor.generateBaseAllocationResponse();
Assert.assertEquals(0, allocateResponse.getNumClusterNodes());
Assert.assertEquals(2, interceptor.getTimedOutSCs(true).size());
return null;
}
// Should be considered timeout
allocateResponse = interceptor.generateBaseAllocationResponse();
Assert.assertEquals(0, allocateResponse.getNumClusterNodes());
Assert.assertEquals(2, interceptor.getTimedOutSCs(true).size());
return null;
});
}
@ -834,87 +818,81 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
UserGroupInformation ugi =
interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// Register the application
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
// Register the application
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
// Allocate one batch of containers
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
// Allocate one batch of containers
registerSubCluster(SubClusterId.newInstance("SC-1"));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
for (Container c : containers) {
LOG.info("Allocated container " + c.getId());
}
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// Make sure all async hb threads are done
interceptor.drainAllAsyncQueue(true);
// Preserve the mock RM instances for secondaries
ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
interceptor.getSecondaryRMs();
// Increase the attemptId and create a new interceptor instance for it
attemptId = ApplicationAttemptId.newInstance(
attemptId.getApplicationId(), attemptId.getAttemptId() + 1);
interceptor = new TestableFederationInterceptor(null, secondaries);
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext,
getConf(), attemptId, "test-user", null, null, null, registry));
return null;
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
for (Container c : containers) {
LOG.info("Allocated container {}.", c.getId());
}
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// Make sure all async hb threads are done
interceptor.drainAllAsyncQueue(true);
// Preserve the mock RM instances for secondaries
ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
interceptor.getSecondaryRMs();
// Increase the attemptId and create a new interceptor instance for it
attemptId = ApplicationAttemptId.newInstance(
attemptId.getApplicationId(), attemptId.getAttemptId() + 1);
interceptor = new TestableFederationInterceptor(null, secondaries);
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext,
getConf(), attemptId, "test-user", null, null, null, registry));
return null;
});
// Update the ugi with new attemptId
ugi = interceptor.getUGIWithToken(interceptor.getAttemptId());
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
lastResponseId = 0;
ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
lastResponseId = 0;
int numberOfContainers = 3;
// Should re-attach secondaries and get the three running containers
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// SC1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
Assert.assertEquals(numberOfContainers,
registerResponse.getContainersFromPreviousAttempts().size());
int numberOfContainers = 3;
// Should re-attach secondaries and get the three running containers
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// SC1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
Assert.assertEquals(numberOfContainers,
registerResponse.getContainersFromPreviousAttempts().size());
// Release all containers
releaseContainersAndAssert(
registerResponse.getContainersFromPreviousAttempts());
// Release all containers
releaseContainersAndAssert(
registerResponse.getContainersFromPreviousAttempts());
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finishResponse);
Assert.assertTrue(finishResponse.getIsUnregistered());
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finishResponse);
Assert.assertTrue(finishResponse.getIsUnregistered());
// After the application succeeds, the registry entry should be deleted
if (interceptor.getRegistryClient() != null) {
Assert.assertEquals(0,
interceptor.getRegistryClient().getAllApplications().size());
}
return null;
// After the application succeeds, the registry entry should be deleted
if (interceptor.getRegistryClient() != null) {
Assert.assertEquals(0,
interceptor.getRegistryClient().getAllApplications().size());
}
return null;
});
}
@ -924,22 +902,17 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
ContainerStatus cStatus = Records.newRecord(ContainerStatus.class);
cStatus.setContainerId(cid);
Container container =
Container.newInstance(cid, null, null, null, null, null);
Container.newInstance(cid, null, null, null, null, null);
AllocateResponse homeResponse = Records.newRecord(AllocateResponse.class);
homeResponse.setAllocatedContainers(Collections.singletonList(container));
homeResponse.setCompletedContainersStatuses(
Collections.singletonList(cStatus));
homeResponse.setUpdatedNodes(
Collections.singletonList(Records.newRecord(NodeReport.class)));
homeResponse.setNMTokens(
Collections.singletonList(Records.newRecord(NMToken.class)));
homeResponse.setUpdatedContainers(
Collections.singletonList(
Records.newRecord(UpdatedContainer.class)));
homeResponse.setUpdateErrors(Collections
.singletonList(Records.newRecord(UpdateContainerError.class)));
homeResponse.setCompletedContainersStatuses(Collections.singletonList(cStatus));
homeResponse.setUpdatedNodes(Collections.singletonList(Records.newRecord(NodeReport.class)));
homeResponse.setNMTokens(Collections.singletonList(Records.newRecord(NMToken.class)));
homeResponse.setUpdatedContainers(Collections.singletonList(
Records.newRecord(UpdatedContainer.class)));
homeResponse.setUpdateErrors(Collections.singletonList(
Records.newRecord(UpdateContainerError.class)));
homeResponse.setAvailableResources(Records.newRecord(Resource.class));
homeResponse.setPreemptionMessage(createDummyPreemptionMessage(
ContainerId.newContainerId(attemptId, 0)));
@ -947,15 +920,12 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
AllocateResponse response = Records.newRecord(AllocateResponse.class);
response.setAllocatedContainers(Collections.singletonList(container));
response.setCompletedContainersStatuses(Collections.singletonList(cStatus));
response.setUpdatedNodes(
Collections.singletonList(Records.newRecord(NodeReport.class)));
response.setNMTokens(
Collections.singletonList(Records.newRecord(NMToken.class)));
response.setUpdatedContainers(
Collections.singletonList(
Records.newRecord(UpdatedContainer.class)));
response.setUpdateErrors(Collections
.singletonList(Records.newRecord(UpdateContainerError.class)));
response.setUpdatedNodes(Collections.singletonList(Records.newRecord(NodeReport.class)));
response.setNMTokens(Collections.singletonList(Records.newRecord(NMToken.class)));
response.setUpdatedContainers(Collections.singletonList(
Records.newRecord(UpdatedContainer.class)));
response.setUpdateErrors(Collections.singletonList(
Records.newRecord(UpdateContainerError.class)));
response.setAvailableResources(Records.newRecord(Resource.class));
response.setPreemptionMessage(createDummyPreemptionMessage(
ContainerId.newContainerId(attemptId, 1)));
@ -964,14 +934,10 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
response, SubClusterId.newInstance("SC-1"));
Assert.assertEquals(2,
homeResponse.getPreemptionMessage().getContract()
.getContainers().size());
Assert.assertEquals(2,
homeResponse.getAllocatedContainers().size());
Assert.assertEquals(2,
homeResponse.getUpdatedNodes().size());
Assert.assertEquals(2,
homeResponse.getCompletedContainersStatuses().size());
homeResponse.getPreemptionMessage().getContract().getContainers().size());
Assert.assertEquals(2, homeResponse.getAllocatedContainers().size());
Assert.assertEquals(2, homeResponse.getUpdatedNodes().size());
Assert.assertEquals(2, homeResponse.getCompletedContainersStatuses().size());
}
private PreemptionMessage createDummyPreemptionMessage(

View File

@ -247,10 +247,10 @@ public class TestFederationInterceptorSecure extends BaseAMRMProxyTest {
private List<Container> getContainersAndAssert(int numberOfResourceRequests,
int numberOfAllocationExcepted) throws Exception {
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
List<Container> containers = new ArrayList<Container>(numberOfResourceRequests);
List<ResourceRequest> askList = new ArrayList<ResourceRequest>(numberOfResourceRequests);
List<Container> containers = new ArrayList<>(numberOfResourceRequests);
List<ResourceRequest> askList = new ArrayList<>(numberOfResourceRequests);
for (int id = 0; id < numberOfResourceRequests; id++) {
askList.add(createResourceRequest("test-node-" + Integer.toString(id), 6000, 2, id % 5, 1));
askList.add(createResourceRequest("test-node-" + id, 6000, 2, id % 5, 1));
}
allocateRequest.setAskList(askList);
@ -280,9 +280,9 @@ public class TestFederationInterceptorSecure extends BaseAMRMProxyTest {
interceptor.drainAllAsyncQueue(false);
containers.addAll(allocateResponse.getAllocatedContainers());
LOG.info("Number of allocated containers in this request: "
+ Integer.toString(allocateResponse.getAllocatedContainers().size()));
LOG.info("Total number of allocated containers: {}", containers.size());
LOG.info("Number of allocated containers in this request: {}.",
allocateResponse.getAllocatedContainers().size());
LOG.info("Total number of allocated containers: {}.", containers.size());
Thread.sleep(10);
}
Assert.assertEquals(numberOfAllocationExcepted, containers.size());
@ -293,7 +293,7 @@ public class TestFederationInterceptorSecure extends BaseAMRMProxyTest {
throws Exception {
Assert.assertTrue(containers.size() > 0);
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
List<ContainerId> relList = new ArrayList<ContainerId>(containers.size());
List<ContainerId> relList = new ArrayList<>(containers.size());
for (Container container : containers) {
relList.add(container.getId());
}
@ -308,9 +308,9 @@ public class TestFederationInterceptorSecure extends BaseAMRMProxyTest {
// The release request will be split and handled by the corresponding UAM.
// The release containers returned by the mock resource managers will be
// aggregated and returned back to us and we can check if total request size
// aggregated and returned back to us, and we can check if total request size
// and returned size are the same
List<ContainerId> containersForReleasedContainerIds = new ArrayList<ContainerId>();
List<ContainerId> containersForReleasedContainerIds = new ArrayList<>();
List<ContainerId> newlyFinished = getCompletedContainerIds(
allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished);
@ -331,8 +331,9 @@ public class TestFederationInterceptorSecure extends BaseAMRMProxyTest {
newlyFinished = getCompletedContainerIds(allocateResponse.getCompletedContainersStatuses());
containersForReleasedContainerIds.addAll(newlyFinished);
LOG.info("Number of containers received in this request: ", newlyFinished.size());
LOG.info("Total number of containers received: ", containersForReleasedContainerIds.size());
LOG.info("Number of containers received in this request: {}.", newlyFinished.size());
LOG.info("Total number of containers received: {}.",
containersForReleasedContainerIds.size());
Thread.sleep(10);
}
@ -342,7 +343,7 @@ public class TestFederationInterceptorSecure extends BaseAMRMProxyTest {
private void checkAMRMToken(Token amrmToken) {
if (amrmToken != null) {
// The token should be the one issued by home MockRM
Assert.assertTrue(amrmToken.getKind().equals(Integer.toString(0)));
Assert.assertEquals(Integer.toString(0), amrmToken.getKind());
}
}
@ -360,106 +361,103 @@ public class TestFederationInterceptorSecure extends BaseAMRMProxyTest {
final RegistryOperations registryObj)
throws Exception {
UserGroupInformation ugi = this.getUGIWithToken(attemptId);
ugi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
interceptor = new TestableFederationInterceptor(mockHomeRm);
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, conf, attemptId,
"test-user", null, null, null, registryObj));
interceptor.cleanupRegistry();
ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
interceptor = new TestableFederationInterceptor(mockHomeRm);
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, conf, attemptId,
"test-user", null, null, null, registryObj));
interceptor.cleanupRegistry();
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId);
registerReq.setTrackingUrl("");
// Register the application
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
RegisterApplicationMasterResponse registerResponse =
interceptor.registerApplicationMaster(registerReq);
Assert.assertNotNull(registerResponse);
lastResponseId = 0;
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
Assert.assertEquals(0, interceptor.getUnmanagedAMPoolSize());
// Allocate one batch of containers
registerSubCluster(SubClusterId.newInstance(SC_ID1));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
// Allocate one batch of containers
registerSubCluster(SubClusterId.newInstance(SC_ID1));
registerSubCluster(SubClusterId.newInstance(HOME_SC_ID));
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
int numberOfContainers = 3;
List<Container> containers =
getContainersAndAssert(numberOfContainers, numberOfContainers * 2);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// Make sure all async hb threads are done
interceptor.drainAllAsyncQueue(true);
// Make sure all async hb threads are done
interceptor.drainAllAsyncQueue(true);
// Prepare for Federation Interceptor restart and recover
Map<String, byte[]> recoveredDataMap = recoverDataMapForAppAttempt(nmStateStore, attemptId);
String scEntry = FederationInterceptor.NMSS_SECONDARY_SC_PREFIX + "SC-1";
if (registryObj == null) {
Assert.assertTrue(recoveredDataMap.containsKey(scEntry));
} else {
// When AMRMPRoxy HA is enabled, NMSS should not have the UAM token,
// it should be in Registry
Assert.assertFalse(recoveredDataMap.containsKey(scEntry));
}
// Preserve the mock RM instances
MockResourceManagerFacade homeRM = interceptor.getHomeRM();
ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
interceptor.getSecondaryRMs();
// Create a new interceptor instance and recover
interceptor = new TestableFederationInterceptor(homeRM, secondaries);
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, conf, attemptId,
"test-user", null, null, null, registryObj));
interceptor.setClientRPC(true);
interceptor.recover(recoveredDataMap);
interceptor.setClientRPC(false);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// SC-1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
// The first allocate call expects a fail-over exception and re-register
try {
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(lastResponseId);
AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
lastResponseId = allocateResponse.getResponseId();
Assert.fail("Expecting an ApplicationMasterNotRegisteredException "
+ " after FederationInterceptor restarts and recovers");
} catch (ApplicationMasterNotRegisteredException e) {
}
interceptor.registerApplicationMaster(registerReq);
lastResponseId = 0;
// Release all containers
releaseContainersAndAssert(containers);
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finishResponse);
Assert.assertEquals(true, finishResponse.getIsUnregistered());
// After the application succeeds, the registry/NMSS entry should be
// cleaned up
if (registryObj != null) {
Assert.assertEquals(0, interceptor.getRegistryClient().getAllApplications().size());
} else {
recoveredDataMap = recoverDataMapForAppAttempt(nmStateStore, attemptId);
Assert.assertFalse(recoveredDataMap.containsKey(scEntry));
}
return null;
// Prepare for Federation Interceptor restart and recover
Map<String, byte[]> recoveredDataMap = recoverDataMapForAppAttempt(nmStateStore, attemptId);
String scEntry = FederationInterceptor.NMSS_SECONDARY_SC_PREFIX + "SC-1";
if (registryObj == null) {
Assert.assertTrue(recoveredDataMap.containsKey(scEntry));
} else {
// When AMRMPRoxy HA is enabled, NMSS should not have the UAM token,
// it should be in Registry
Assert.assertFalse(recoveredDataMap.containsKey(scEntry));
}
// Preserve the mock RM instances
MockResourceManagerFacade homeRM = interceptor.getHomeRM();
ConcurrentHashMap<String, MockResourceManagerFacade> secondaries =
interceptor.getSecondaryRMs();
// Create a new interceptor instance and recover
interceptor = new TestableFederationInterceptor(homeRM, secondaries);
interceptor.init(new AMRMProxyApplicationContextImpl(nmContext, conf, attemptId,
"test-user", null, null, null, registryObj));
interceptor.setClientRPC(true);
interceptor.recover(recoveredDataMap);
interceptor.setClientRPC(false);
Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize());
// SC-1 should be initialized to be timed out
Assert.assertEquals(1, interceptor.getTimedOutSCs(true).size());
// The first allocate call expects a fail-over exception and re-register
try {
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
allocateRequest.setResponseId(lastResponseId);
AllocateResponse allocateResponse = interceptor.allocate(allocateRequest);
lastResponseId = allocateResponse.getResponseId();
Assert.fail("Expecting an ApplicationMasterNotRegisteredException "
+ " after FederationInterceptor restarts and recovers");
} catch (ApplicationMasterNotRegisteredException e) {
}
interceptor.registerApplicationMaster(registerReq);
lastResponseId = 0;
// Release all containers
releaseContainersAndAssert(containers);
// Finish the application
FinishApplicationMasterRequest finishReq =
Records.newRecord(FinishApplicationMasterRequest.class);
finishReq.setDiagnostics("");
finishReq.setTrackingUrl("");
finishReq.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
FinishApplicationMasterResponse finishResponse =
interceptor.finishApplicationMaster(finishReq);
Assert.assertNotNull(finishResponse);
Assert.assertTrue(finishResponse.getIsUnregistered());
// After the application succeeds, the registry/NMSS entry should be
// cleaned up
if (registryObj != null) {
Assert.assertEquals(0, interceptor.getRegistryClient().getAllApplications().size());
} else {
recoveredDataMap = recoverDataMapForAppAttempt(nmStateStore, attemptId);
Assert.assertFalse(recoveredDataMap.containsKey(scEntry));
}
return null;
});
}

View File

@ -77,7 +77,7 @@ public class TestableFederationInterceptor extends FederationInterceptor {
}
@Override
protected AMHeartbeatRequestHandler createHomeHeartbeartHandler(
protected AMHeartbeatRequestHandler createHomeHeartbeatHandler(
Configuration conf, ApplicationId appId,
AMRMClientRelayer rmProxyRelayer) {
return new TestableAMRequestHandlerThread(conf, appId, rmProxyRelayer);
@ -142,7 +142,7 @@ public class TestableFederationInterceptor extends FederationInterceptor {
}
/**
* Drain all aysnc heartbeat threads, comes in two favors:
* Drain all async heartbeat threads, comes in two favors:
*
* 1. waitForAsyncHBThreadFinish == false. Only wait for the async threads to
* pick up all pending heartbeat requests. Not necessarily wait for all
@ -159,9 +159,9 @@ public class TestableFederationInterceptor extends FederationInterceptor {
LOG.info("waiting to drain home heartbeat handler");
if (waitForAsyncHBThreadFinish) {
getHomeHeartbeartHandler().drainHeartbeatThread();
getHomeHeartbeatHandler().drainHeartbeatThread();
} else {
while (getHomeHeartbeartHandler().getRequestQueueSize() > 0) {
while (getHomeHeartbeatHandler().getRequestQueueSize() > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
@ -291,12 +291,9 @@ public class TestableFederationInterceptor extends FederationInterceptor {
public void run() {
try {
getUGIWithToken(getAttemptId())
.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() {
TestableAMRequestHandlerThread.super.run();
return null;
}
.doAs((PrivilegedExceptionAction<Object>) () -> {
TestableAMRequestHandlerThread.super.run();
return null;
});
} catch (Exception e) {
}