diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 034f03c6fa..6825a36ebd 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -594,11 +594,9 @@
-
-
-
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index 68c55ac1a9..e33d7e1977 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -246,6 +246,16 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
shouldReRegisterNext = false;
+ synchronized (applicationContainerIdMap) {
+ if (applicationContainerIdMap.containsKey(amrmToken)) {
+ throw new InvalidApplicationMasterRequestException(
+ AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
+ }
+ // Keep track of the containers that are returned to this application
+ applicationContainerIdMap.put(amrmToken, new ArrayList());
+ }
+
+ // Make sure we wait for certain test cases last in the method
synchronized (syncObj) {
syncObj.notifyAll();
// We reuse the port number to indicate whether the unit test want us to
@@ -261,14 +271,6 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
}
}
- synchronized (applicationContainerIdMap) {
- if (applicationContainerIdMap.containsKey(amrmToken)) {
- throw new InvalidApplicationMasterRequestException(
- AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
- }
- // Keep track of the containers that are returned to this application
- applicationContainerIdMap.put(amrmToken, new ArrayList());
- }
return RegisterApplicationMasterResponse.newInstance(null, null, null, null,
null, request.getHost(), null);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
index ffe47f4bd6..28724aaf25 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java
@@ -208,22 +208,25 @@ public void init(AMRMProxyApplicationContext appContext) {
* requests from AM because of timeout between AM and AMRMProxy, which is
* shorter than the timeout + failOver between FederationInterceptor
* (AMRMProxy) and RM.
+ *
+ * For the same reason, this method needs to be synchronized.
*/
@Override
- public RegisterApplicationMasterResponse registerApplicationMaster(
- RegisterApplicationMasterRequest request)
- throws YarnException, IOException {
+ public synchronized RegisterApplicationMasterResponse
+ registerApplicationMaster(RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
// If AM is calling with a different request, complain
- if (this.amRegistrationRequest != null
- && !this.amRegistrationRequest.equals(request)) {
- throw new YarnException("A different request body recieved. AM should"
- + " not call registerApplicationMaster with different request body");
+ if (this.amRegistrationRequest != null) {
+ if (!this.amRegistrationRequest.equals(request)) {
+ throw new YarnException("AM should not call "
+ + "registerApplicationMaster with a different request body");
+ }
+ } else {
+ // Save the registration request. This will be used for registering with
+ // secondary sub-clusters using UAMs, as well as re-register later
+ this.amRegistrationRequest = request;
}
- // Save the registration request. This will be used for registering with
- // secondary sub-clusters using UAMs, as well as re-register later
- this.amRegistrationRequest = request;
-
/*
* Present to AM as if we are the RM that never fails over. When actual RM
* fails over, we always re-register automatically.
@@ -245,22 +248,8 @@ public RegisterApplicationMasterResponse registerApplicationMaster(
* is running and will breaks the elasticity feature. The registration with
* the other sub-cluster RM will be done lazily as needed later.
*/
- try {
- this.amRegistrationResponse =
- this.homeRM.registerApplicationMaster(request);
- } catch (InvalidApplicationMasterRequestException e) {
- if (e.getMessage()
- .contains(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE)) {
- // Some other register thread might have succeeded in the meantime
- if (this.amRegistrationResponse != null) {
- LOG.info("Other concurrent thread registered successfully, "
- + "simply return the same success register response");
- return this.amRegistrationResponse;
- }
- }
- // This is a real issue, throw back to AM
- throw e;
- }
+ this.amRegistrationResponse =
+ this.homeRM.registerApplicationMaster(request);
// the queue this application belongs will be used for getting
// AMRMProxy policy from state store.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
index 4e15323460..34b07416a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java
@@ -21,6 +21,11 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -36,6 +41,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
@@ -234,7 +240,7 @@ public void testMultipleSubClusters() throws Exception {
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
- registerReq.setRpcPort(testAppId);
+ registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
@@ -298,7 +304,7 @@ public void testReregister() throws Exception {
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
- registerReq.setRpcPort(testAppId);
+ registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
@@ -338,6 +344,78 @@ public void testReregister() throws Exception {
Assert.assertEquals(true, finshResponse.getIsUnregistered());
}
+ /*
+ * Test concurrent register threads. This is possible because the timeout
+ * between AM and AMRMProxy is shorter than the timeout + failOver between
+ * FederationInterceptor (AMRMProxy) and RM. When first call is blocked due to
+ * RM failover and AM timeout, it will call us resulting in a second register
+ * thread.
+ */
+ @Test(timeout = 5000)
+ public void testConcurrentRegister()
+ throws InterruptedException, ExecutionException {
+ ExecutorService threadpool = Executors.newCachedThreadPool();
+ ExecutorCompletionService compSvc =
+ new ExecutorCompletionService<>(threadpool);
+
+ Object syncObj = MockResourceManagerFacade.getSyncObj();
+
+ // Two register threads
+ synchronized (syncObj) {
+ // Make sure first thread will block within RM, before the second thread
+ // starts
+ LOG.info("Starting first register thread");
+ compSvc.submit(new ConcurrentRegisterAMCallable());
+
+ try {
+ LOG.info("Test main starts waiting for the first thread to block");
+ syncObj.wait();
+ LOG.info("Test main wait finished");
+ } catch (Exception e) {
+ LOG.info("Test main wait interrupted", e);
+ }
+ }
+
+ // The second thread will get already registered exception from RM.
+ LOG.info("Starting second register thread");
+ compSvc.submit(new ConcurrentRegisterAMCallable());
+
+ // Notify the first register thread to return
+ LOG.info("Let first blocked register thread move on");
+ synchronized (syncObj) {
+ syncObj.notifyAll();
+ }
+
+ // Both thread should return without exception
+ RegisterApplicationMasterResponse response = compSvc.take().get();
+ Assert.assertNotNull(response);
+
+ response = compSvc.take().get();
+ Assert.assertNotNull(response);
+
+ threadpool.shutdown();
+ }
+
+ /**
+ * A callable that calls registerAM to RM with blocking.
+ */
+ public class ConcurrentRegisterAMCallable
+ implements Callable {
+ @Override
+ public RegisterApplicationMasterResponse call() throws Exception {
+ RegisterApplicationMasterResponse response = null;
+ try {
+ // Use port number 1001 to let mock RM block in the register call
+ response = interceptor.registerApplicationMaster(
+ RegisterApplicationMasterRequest.newInstance(null, 1001, null));
+ } catch (Exception e) {
+ LOG.info("Register thread exception", e);
+ response = null;
+ }
+ return response;
+ }
+ }
+
@Test
public void testRequestInterceptorChainCreation() throws Exception {
RequestInterceptor root =
@@ -381,7 +459,7 @@ public void testTwoIdenticalRegisterRequest() throws Exception {
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
- registerReq.setRpcPort(testAppId);
+ registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
for (int i = 0; i < 2; i++) {
@@ -397,7 +475,7 @@ public void testTwoDifferentRegisterRequest() throws Exception {
RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
- registerReq.setRpcPort(testAppId);
+ registerReq.setRpcPort(0);
registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse =
@@ -407,7 +485,7 @@ public void testTwoDifferentRegisterRequest() throws Exception {
// Register the application second time with a different request obj
registerReq = Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId));
- registerReq.setRpcPort(testAppId);
+ registerReq.setRpcPort(0);
registerReq.setTrackingUrl("different");
try {
registerResponse = interceptor.registerApplicationMaster(registerReq);