diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 97ef388f16..aadfbe1676 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -671,6 +671,10 @@ Release 2.1.0-beta - 2013-07-02 YARN-845. RM crash with NPE on NODE_UPDATE (Mayank Bansal via bikas) + YARN-369. Handle ( or throw a proper error when receiving) status updates + from application masters that have not registered (Mayank Bansal & + Abhishek Kapoor via bikas) + BREAKDOWN OF HADOOP-8562 SUBTASKS AND RELATED JIRAS YARN-158. Yarn creating package-info.java must not depend on sh. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index f6b45232f1..d5f0a67fef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -195,17 +195,35 @@ public RegisterApplicationMasterResponse registerApplicationMaster( // Allow only one thread in AM to do registerApp at a time. synchronized (lastResponse) { - LOG.info("AM registration " + applicationAttemptId); + if (hasApplicationMasterRegistered(applicationAttemptId)) { + String message = + "Application Master is already registered : " + + applicationAttemptId.getApplicationId(); + LOG.warn(message); + RMAuditLogger.logFailure( + this.rmContext.getRMApps() + .get(applicationAttemptId.getApplicationId()).getUser(), + AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message, + applicationAttemptId.getApplicationId(), applicationAttemptId); + throw new InvalidApplicationMasterRequestException(message); + } + this.amLivelinessMonitor.receivedPing(applicationAttemptId); - - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppAttemptRegistrationEvent(applicationAttemptId, request - .getHost(), request.getRpcPort(), request.getTrackingUrl())); - RMApp app = this.rmContext.getRMApps().get(appID); - RMAuditLogger.logSuccess(app.getUser(), - AuditConstants.REGISTER_AM, "ApplicationMasterService", appID, - applicationAttemptId); + + // Setting the response id to 0 to identify if the + // application master is register for the respective attemptid + lastResponse.setResponseId(0); + responseMap.put(applicationAttemptId, lastResponse); + LOG.info("AM registration " + applicationAttemptId); + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMAppAttemptRegistrationEvent(applicationAttemptId, request + .getHost(), request.getRpcPort(), request.getTrackingUrl())); + RMAuditLogger.logSuccess(app.getUser(), AuditConstants.REGISTER_AM, + "ApplicationMasterService", appID, applicationAttemptId); // Pick up min/max resource from scheduler... RegisterApplicationMasterResponse response = recordFactory @@ -257,6 +275,24 @@ public FinishApplicationMasterResponse finishApplicationMaster( } } + /** + * @param appAttemptId + * @return true if application is registered for the respective attemptid + */ + public boolean hasApplicationMasterRegistered( + ApplicationAttemptId appAttemptId) { + boolean hasApplicationMasterRegistered = false; + AllocateResponse lastResponse = responseMap.get(appAttemptId); + if (lastResponse != null) { + synchronized (lastResponse) { + if (lastResponse.getResponseId() >= 0) { + hasApplicationMasterRegistered = true; + } + } + } + return hasApplicationMasterRegistered; + } + @Override public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { @@ -272,6 +308,20 @@ public AllocateResponse allocate(AllocateRequest request) LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); return resync; } + + if (!hasApplicationMasterRegistered(appAttemptId)) { + String message = + "Application Master is trying to allocate before registering for: " + + appAttemptId.getApplicationId(); + LOG.error(message); + RMAuditLogger.logFailure( + this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) + .getUser(), AuditConstants.REGISTER_AM, "", + "ApplicationMasterService", message, appAttemptId.getApplicationId(), + appAttemptId); + throw new InvalidApplicationMasterRequestException(message); + } + if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { /* old heartbeat */ return lastResponse; @@ -442,7 +492,9 @@ private PreemptionMessage generatePreemptionMessage(Allocation allocation){ public void registerAppAttempt(ApplicationAttemptId attemptId) { AllocateResponse response = recordFactory.newRecordInstance(AllocateResponse.class); - response.setResponseId(0); + // set response id to -1 before application master for the following + // attemptID get registered + response.setResponseId(-1); LOG.info("Registering app attempt : " + attemptId); responseMap.put(attemptId, response); rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/InvalidApplicationMasterRequestException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/InvalidApplicationMasterRequestException.java new file mode 100644 index 0000000000..f0e97f5828 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/InvalidApplicationMasterRequestException.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * The exception is thrown when an application Master call allocate without + * calling RegisterApplicationMaster or try to register more then once. + */ +public class InvalidApplicationMasterRequestException extends YarnException { + + private static final long serialVersionUID = 1357686L; + + public InvalidApplicationMasterRequestException(Throwable cause) { + super(cause); + } + + public InvalidApplicationMasterRequestException(String message) { + super(message); + } + + public InvalidApplicationMasterRequestException(String message, + Throwable cause) { + super(message, cause); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index c4565d32bb..ae631b1cab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -78,10 +78,19 @@ public void waitForState(RMAppAttemptState finalState) throws Exception { finalState, attempt.getAppAttemptState()); } - public RegisterApplicationMasterResponse registerAppAttempt() throws Exception { - waitForState(RMAppAttemptState.LAUNCHED); + public RegisterApplicationMasterResponse registerAppAttempt() + throws Exception { + return registerAppAttempt(true); + } + + public RegisterApplicationMasterResponse registerAppAttempt(boolean wait) + throws Exception { + if (wait) { + waitForState(RMAppAttemptState.LAUNCHED); + } responseId = 0; - RegisterApplicationMasterRequest req = Records.newRecord(RegisterApplicationMasterRequest.class); + RegisterApplicationMasterRequest req = + Records.newRecord(RegisterApplicationMasterRequest.class); req.setApplicationAttemptId(attemptId); req.setHost(""); req.setRpcPort(1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 792d510b75..ad55b0c22e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; +import java.util.ArrayList; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ContainerManagementProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; @@ -34,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -166,4 +169,58 @@ public void testAMLaunchAndCleanup() throws Exception { am.waitForState(RMAppAttemptState.FINISHED); rm.stop(); } + + + @SuppressWarnings("unused") + @Test(timeout = 100000) + public void testallocateBeforeAMRegistration() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + boolean thrown = false; + rootLogger.setLevel(Level.DEBUG); + MockRM rm = new MockRM(); + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 5000); + RMApp app = rm.submitApp(2000); + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt = app.getCurrentAppAttempt(); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + + // request for containers + int request = 2; + try { + AllocateResponse ar = + am.allocate("h1", 1000, request, new ArrayList()); + } catch (Exception e) { + Assert.assertEquals("Application Master is trying to allocate before " + + "registering for: " + attempt.getAppAttemptId().getApplicationId(), + e.getMessage()); + thrown = true; + } + // kick the scheduler + nm1.nodeHeartbeat(true); + try { + AllocateResponse amrs = + am.allocate(new ArrayList(), + new ArrayList()); + } catch (Exception e) { + Assert.assertEquals("Application Master is trying to allocate before " + + "registering for: " + attempt.getAppAttemptId().getApplicationId(), + e.getMessage()); + thrown = true; + } + Assert.assertTrue(thrown); + am.registerAppAttempt(); + thrown = false; + try { + am.registerAppAttempt(false); + } + catch (Exception e) { + Assert.assertEquals("Application Master is already registered : " + + attempt.getAppAttemptId().getApplicationId(), + e.getMessage()); + thrown = true; + } + Assert.assertTrue(thrown); + } }