From 0f3b6900be1a3b2e4624f31f84656f4a32dadce9 Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Thu, 23 Oct 2014 21:56:03 -0700 Subject: [PATCH] YARN-2209. Replaced AM resync/shutdown command with corresponding exceptions and made related MR changes. Contributed by Jian He. --- .../v2/app/local/LocalContainerAllocator.java | 43 +++++++---------- .../v2/app/rm/RMContainerAllocator.java | 46 ++++++++---------- .../v2/app/rm/RMContainerRequestor.java | 23 ++------- .../v2/app/rm/TestRMContainerAllocator.java | 32 +++++++------ hadoop-yarn-project/CHANGES.txt | 3 ++ .../api/protocolrecords/AllocateResponse.java | 2 +- .../hadoop/yarn/api/records/AMCommand.java | 26 +++++++--- .../ApplicationAttemptNotFoundException.java | 6 ++- .../api/async/impl/AMRMClientAsyncImpl.java | 40 +++++----------- .../yarn/client/api/impl/AMRMClientImpl.java | 16 +++---- .../yarn/client/ProtocolHATestBase.java | 2 +- .../api/async/impl/TestAMRMClientAsync.java | 29 +++++------- .../yarn/client/api/impl/TestAMRMClient.java | 7 +-- .../yarn/api/records/impl/pb/ProtoUtils.java | 3 +- .../util/resource/ResourceCalculator.java | 5 -- .../hadoop/yarn/api/TestAllocateResponse.java | 6 +-- .../yarn/server/utils/BuilderUtils.java | 2 +- .../ApplicationMasterService.java | 47 +++++++++---------- .../server/resourcemanager/RMAuditLogger.java | 1 + .../TestApplicationMasterLauncher.java | 41 +++++++++------- .../server/resourcemanager/TestRMRestart.java | 15 +++--- .../TestAMRMRPCResponseId.java | 14 ++++-- .../security/TestAMRMTokens.java | 2 + 23 files changed, 196 insertions(+), 215 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java index 4d92c006bf..19efe17135 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java @@ -36,12 +36,13 @@ import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -98,11 +99,24 @@ protected synchronized void heartbeat() throws Exception { AllocateRequest.newInstance(this.lastResponseID, super.getApplicationProgress(), new ArrayList(), new ArrayList(), null); - AllocateResponse allocateResponse; try { - allocateResponse = scheduler.allocate(allocateRequest); + scheduler.allocate(allocateRequest); // Reset retry count if no exception occurred. retrystartTime = System.currentTimeMillis(); + } catch (ApplicationAttemptNotFoundException e) { + LOG.info("Event from RM: shutting down Application Master"); + // This can happen if the RM has been restarted. If it is in that state, + // this application must clean itself up. + eventHandler.handle(new JobEvent(this.getJob().getID(), + JobEventType.JOB_AM_REBOOT)); + throw new YarnRuntimeException( + "Resource Manager doesn't recognize AttemptId: " + + this.getContext().getApplicationID(), e); + } catch (ApplicationMasterNotRegisteredException e) { + LOG.info("ApplicationMaster is out of sync with ResourceManager," + + " hence resync and send outstanding requests."); + this.lastResponseID = 0; + register(); } catch (Exception e) { // This can happen when the connection to the RM has gone down. Keep // re-trying until the retryInterval has expired. @@ -117,29 +131,6 @@ protected synchronized void heartbeat() throws Exception { // continue to attempt to contact the RM. throw e; } - if (allocateResponse.getAMCommand() != null) { - switch(allocateResponse.getAMCommand()) { - case AM_RESYNC: - LOG.info("ApplicationMaster is out of sync with ResourceManager," - + " hence resyncing."); - this.lastResponseID = 0; - register(); - break; - case AM_SHUTDOWN: - LOG.info("Event from RM: shutting down Application Master"); - // This can happen if the RM has been restarted. If it is in that state, - // this application must clean itself up. - eventHandler.handle(new JobEvent(this.getJob().getID(), - JobEventType.JOB_AM_REBOOT)); - throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " + - this.getContext().getApplicationID()); - default: - String msg = - "Unhandled value of AMCommand: " + allocateResponse.getAMCommand(); - LOG.error(msg); - throw new YarnRuntimeException(msg); - } - } } @SuppressWarnings("unchecked") diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index fb8771af6a..e0689971dc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -76,6 +76,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.NMTokenCache; +import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; @@ -246,7 +248,7 @@ public void run() { protected synchronized void heartbeat() throws Exception { scheduleStats.updateAndLogIfChanged("Before Scheduling: "); List allocatedContainers = getResources(); - if (allocatedContainers.size() > 0) { + if (allocatedContainers != null && allocatedContainers.size() > 0) { scheduledRequests.assign(allocatedContainers); } @@ -675,6 +677,22 @@ private List getResources() throws Exception { response = makeRemoteRequest(); // Reset retry count if no exception occurred. retrystartTime = System.currentTimeMillis(); + } catch (ApplicationAttemptNotFoundException e ) { + // This can happen if the RM has been restarted. If it is in that state, + // this application must clean itself up. + eventHandler.handle(new JobEvent(this.getJob().getID(), + JobEventType.JOB_AM_REBOOT)); + throw new YarnRuntimeException( + "Resource Manager doesn't recognize AttemptId: " + + this.getContext().getApplicationID(), e); + } catch (ApplicationMasterNotRegisteredException e) { + LOG.info("ApplicationMaster is out of sync with ResourceManager," + + " hence resync and send outstanding requests."); + // RM may have restarted, re-register with RM. + lastResponseID = 0; + register(); + addOutstandingRequestOnResync(); + return null; } catch (Exception e) { // This can happen when the connection to the RM has gone down. Keep // re-trying until the retryInterval has expired. @@ -689,32 +707,6 @@ private List getResources() throws Exception { // continue to attempt to contact the RM. throw e; } - if (response.getAMCommand() != null) { - switch(response.getAMCommand()) { - case AM_RESYNC: - LOG.info("ApplicationMaster is out of sync with ResourceManager," - + " hence resyncing."); - lastResponseID = 0; - - // Registering to allow RM to discover an active AM for this - // application - register(); - addOutstandingRequestOnResync(); - break; - case AM_SHUTDOWN: - // This can happen if the RM has been restarted. If it is in that state, - // this application must clean itself up. - eventHandler.handle(new JobEvent(this.getJob().getID(), - JobEventType.JOB_AM_REBOOT)); - throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " + - this.getContext().getApplicationID()); - default: - String msg = - "Unhandled value of AMCommand: " + response.getAMCommand(); - LOG.error(msg); - throw new YarnRuntimeException(msg); - } - } Resource newHeadRoom = getAvailableResources() == null ? Resources.none() : getAvailableResources(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 943c0af0d9..bb9ad029f3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -40,7 +39,6 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -51,6 +49,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import com.google.common.annotations.VisibleForTesting; + /** * Keeps the data structures to send container requests to RM. @@ -176,7 +176,8 @@ protected void serviceInit(Configuration conf) throws Exception { LOG.info("blacklistDisablePercent is " + blacklistDisablePercent); } - protected AllocateResponse makeRemoteRequest() throws IOException { + protected AllocateResponse makeRemoteRequest() throws YarnException, + IOException { ResourceBlacklistRequest blacklistRequest = ResourceBlacklistRequest.newInstance(new ArrayList(blacklistAdditions), new ArrayList(blacklistRemovals)); @@ -184,16 +185,7 @@ protected AllocateResponse makeRemoteRequest() throws IOException { AllocateRequest.newInstance(lastResponseID, super.getApplicationProgress(), new ArrayList(ask), new ArrayList(release), blacklistRequest); - AllocateResponse allocateResponse; - try { - allocateResponse = scheduler.allocate(allocateRequest); - } catch (YarnException e) { - throw new IOException(e); - } - - if (isResyncCommand(allocateResponse)) { - return allocateResponse; - } + AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); lastResponseID = allocateResponse.getResponseId(); availableResources = allocateResponse.getAvailableResources(); lastClusterNmCount = clusterNmCount; @@ -222,11 +214,6 @@ protected AllocateResponse makeRemoteRequest() throws IOException { return allocateResponse; } - protected boolean isResyncCommand(AllocateResponse allocateResponse) { - return allocateResponse.getAMCommand() != null - && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC; - } - protected void addOutstandingRequestOnResync() { for (Map> rr : remoteRequestsTable .values()) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 341e67354a..fdc456a708 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -31,15 +31,17 @@ import static org.mockito.Mockito.when; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.hadoop.mapreduce.v2.app.AppContext; -import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; -import org.apache.hadoop.mapreduce.v2.app.MRApp; -import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; -import org.junit.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -50,6 +52,9 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; +import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; @@ -87,9 +92,11 @@ import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -103,10 +110,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.SystemClock; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -1752,14 +1761,11 @@ public void updateSchedulerProxy(MyResourceManager rm) { } @Override - protected AllocateResponse makeRemoteRequest() throws IOException { + protected AllocateResponse makeRemoteRequest() throws IOException, + YarnException { allocateResponse = super.makeRemoteRequest(); return allocateResponse; } - - public boolean isResyncCommand() { - return super.isResyncCommand(allocateResponse); - } } @Test @@ -2255,8 +2261,6 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart() // send allocate request to 2nd RM and get resync command allocator.schedule(); dispatcher.await(); - Assert.assertTrue("Last allocate response is not RESYNC", - allocator.isResyncCommand()); // Step-5 : On Resync,AM sends all outstanding // asks,release,blacklistAaddition diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ba6de1e2dc..bc93be152e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -388,6 +388,9 @@ Release 2.6.0 - UNRELEASED YARN-2709. Made timeline client getDelegationToken API retry if ConnectException happens. (Li Lu via zjshen) + YARN-2209. Replaced AM resync/shutdown command with corresponding exceptions and + made related MR changes. (Jian He via zjshen) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index e56ba6156b..46ac6428a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -86,7 +86,7 @@ public static AllocateResponse newInstance(int responseId, response.setNMTokens(nmTokens); return response; } - + @Public @Stable public static AllocateResponse newInstance(int responseId, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMCommand.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMCommand.java index 5c9a985972..7e8ba2a37c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMCommand.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMCommand.java @@ -20,7 +20,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; /** * Command sent by the Resource Manager to the Application Master in the @@ -30,16 +34,26 @@ @Public @Unstable public enum AMCommand { + /** - * Sent by Resource Manager when it is out of sync with the AM and wants the - * AM get back in sync. + * @deprecated Sent by Resource Manager when it is out of sync with the AM and + * wants the AM get back in sync. + * + * Note: Instead of sending this command, + * {@link ApplicationMasterNotRegisteredException} will be thrown + * when ApplicationMaster is out of sync with ResourceManager and + * ApplicationMaster is expected to re-register with RM by calling + * {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)} */ AM_RESYNC, - + /** - * Sent by Resource Manager when it wants the AM to shutdown. Eg. when the - * node is going down for maintenance. The AM should save any state and - * prepare to be restarted at a later time. + * @deprecated Sent by Resource Manager when it wants the AM to shutdown. + * Note: This command was earlier sent by ResourceManager to + * instruct AM to shutdown if RM had restarted. Now + * {@link ApplicationAttemptNotFoundException} will be thrown in case + * that RM has restarted and AM is supposed to handle this + * exception by shutting down itself. */ AM_SHUTDOWN } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationAttemptNotFoundException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationAttemptNotFoundException.java index 59ebf5bc40..f2ec38b022 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationAttemptNotFoundException.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ApplicationAttemptNotFoundException.java @@ -21,12 +21,16 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol; +import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; /** * This exception is thrown on * {@link ApplicationHistoryProtocol#getApplicationAttemptReport (GetApplicationAttemptReportRequest)} - * API when the Application Attempt doesn't exist in Application History Server + * API when the Application Attempt doesn't exist in Application History Server or + * {@link ApplicationMasterProtocol#allocate(AllocateRequest)} if application + * doesn't exist in RM. */ @Public @Unstable diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java index e7659bd3df..82768bbf61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java @@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -43,6 +42,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; +import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -219,9 +219,13 @@ public void run() { if (!keepRunning) { return; } - + try { response = client.allocate(progress); + } catch (ApplicationAttemptNotFoundException e) { + handler.onShutdownRequest(); + LOG.info("Shutdown requested. Stopping callback."); + return; } catch (Throwable ex) { LOG.error("Exception on heartbeat", ex); savedException = ex; @@ -229,21 +233,17 @@ public void run() { handlerThread.interrupt(); return; } - } - if (response != null) { - while (true) { - try { - responseQueue.put(response); - if (response.getAMCommand() == AMCommand.AM_SHUTDOWN) { - return; + if (response != null) { + while (true) { + try { + responseQueue.put(response); + break; + } catch (InterruptedException ex) { + LOG.debug("Interrupted while waiting to put on response queue", ex); } - break; - } catch (InterruptedException ex) { - LOG.debug("Interrupted while waiting to put on response queue", ex); } } } - try { Thread.sleep(heartbeatIntervalMs.get()); } catch (InterruptedException ex) { @@ -276,20 +276,6 @@ public void run() { LOG.info("Interrupted while waiting for queue", ex); continue; } - - if (response.getAMCommand() != null) { - switch(response.getAMCommand()) { - case AM_SHUTDOWN: - handler.onShutdownRequest(); - LOG.info("Shutdown requested. Stopping callback."); - return; - default: - String msg = - "Unhandled value of RM AMCommand: " + response.getAMCommand(); - LOG.error(msg); - throw new YarnRuntimeException(msg); - } - } List updatedNodes = response.getUpdatedNodes(); if (!updatedNodes.isEmpty()) { handler.onNodesUpdated(updatedNodes); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index e5e32e9519..071c1eeaf1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -275,15 +274,16 @@ public AllocateResponse allocate(float progressIndicator) blacklistRemovals.clear(); } - allocateResponse = rmClient.allocate(allocateRequest); - if (isResyncCommand(allocateResponse)) { + try { + allocateResponse = rmClient.allocate(allocateRequest); + } catch (ApplicationMasterNotRegisteredException e) { LOG.warn("ApplicationMaster is out of sync with ResourceManager," + " hence resyncing."); synchronized (this) { release.addAll(this.pendingRelease); blacklistAdditions.addAll(this.blacklistedNodes); for (Map> rr : remoteRequestsTable - .values()) { + .values()) { for (Map capabalities : rr.values()) { for (ResourceRequestInfo request : capabalities.values()) { addResourceRequestToAsk(request.remoteRequest); @@ -293,7 +293,8 @@ public AllocateResponse allocate(float progressIndicator) } // re register with RM registerApplicationMaster(); - return allocate(progressIndicator); + allocateResponse = allocate(progressIndicator); + return allocateResponse; } synchronized (this) { @@ -349,11 +350,6 @@ protected void removePendingReleaseRequests( } } - private boolean isResyncCommand(AllocateResponse allocateResponse) { - return allocateResponse.getAMCommand() != null - && allocateResponse.getAMCommand() == AMCommand.AM_RESYNC; - } - @Private @VisibleForTesting protected void populateNMTokens(List nmTokens) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java index 1cda9d5b2a..512cd6b213 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java @@ -827,7 +827,7 @@ public AllocateResponse createFakeAllocateResponse() { return AllocateResponse.newInstance(-1, new ArrayList(), new ArrayList(), new ArrayList(), - Resource.newInstance(1024, 2), AMCommand.AM_RESYNC, 1, + Resource.newInstance(1024, 2), null, 1, null, new ArrayList()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java index 79f53fc4f4..b00598a5d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java @@ -18,16 +18,15 @@ package org.apache.hadoop.yarn.client.api.async.impl; -import com.google.common.base.Supplier; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.ArrayList; @@ -36,13 +35,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -56,12 +52,16 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; +import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.junit.Assert; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.google.common.base.Supplier; + public class TestAMRMClientAsync { private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class); @@ -211,10 +211,10 @@ public void testAMRMClientAsyncShutDown() throws Exception { @SuppressWarnings("unchecked") AMRMClient client = mock(AMRMClientImpl.class); - final AllocateResponse shutDownResponse = createAllocateResponse( - new ArrayList(), new ArrayList(), null); - shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN); - when(client.allocate(anyFloat())).thenReturn(shutDownResponse); + createAllocateResponse(new ArrayList(), + new ArrayList(), null); + when(client.allocate(anyFloat())).thenThrow( + new ApplicationAttemptNotFoundException("app not found, shut down")); AMRMClientAsync asyncClient = AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler); @@ -235,11 +235,8 @@ public void testAMRMClientAsyncShutDownWithWaitFor() throws Exception { final TestCallbackHandler callbackHandler = new TestCallbackHandler(); @SuppressWarnings("unchecked") AMRMClient client = mock(AMRMClientImpl.class); - - final AllocateResponse shutDownResponse = createAllocateResponse( - new ArrayList(), new ArrayList(), null); - shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN); - when(client.allocate(anyFloat())).thenReturn(shutDownResponse); + when(client.allocate(anyFloat())).thenThrow( + new ApplicationAttemptNotFoundException("app not found, shut down")); AMRMClientAsync asyncClient = AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java index 35e6635e32..e24b5f6491 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java @@ -932,7 +932,7 @@ public void testAMRMClientOnAMRMTokenRollOver() throws YarnException, Assert.assertNotEquals(amrmToken_1, amrmToken_2); // can do the allocate call with latest AMRMToken - amClient.allocate(0.1f); + AllocateResponse response = amClient.allocate(0.1f); // Verify latest AMRMToken can be used to send allocation request. UserGroupInformation testUser1 = @@ -953,7 +953,8 @@ public void testAMRMClientOnAMRMTokenRollOver() throws YarnException, .getResourceManager().getApplicationMasterService().getBindAddress()); testUser1.addToken(newVersionToken); - + AllocateRequest request = Records.newRecord(AllocateRequest.class); + request.setResponseId(response.getResponseId()); testUser1.doAs(new PrivilegedAction() { @Override public ApplicationMasterProtocol run() { @@ -962,7 +963,7 @@ public ApplicationMasterProtocol run() { yarnCluster.getResourceManager().getApplicationMasterService() .getBindAddress(), conf); } - }).allocate(Records.newRecord(AllocateRequest.class)); + }).allocate(request); // Make sure previous token has been rolled-over // and can not use this rolled-over token to make a allocate all. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index f18a77c31e..586e9dd365 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -51,11 +51,10 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos; import com.google.protobuf.ByteString; -import org.apache.hadoop.yarn.proto.YarnServiceProtos; - @Private @Unstable public class ProtoUtils { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java index cb076997d2..442196cb48 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.yarn.util.resource; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; @@ -30,14 +28,11 @@ @Unstable public abstract class ResourceCalculator { - private static final Log LOG = LogFactory.getLog(ResourceCalculator.class); - public abstract int compare(Resource clusterResource, Resource lhs, Resource rhs); public static int divideAndCeil(int a, int b) { if (b == 0) { - LOG.info("divideAndCeil called with a=" + a + " b=" + b); return 0; } return (a + (b - 1)) / b; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java index a8a56d78e6..fbe9af9189 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java @@ -21,8 +21,6 @@ import java.util.ArrayList; import java.util.List; -import org.junit.Assert; - import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; import org.apache.hadoop.yarn.api.records.AMCommand; @@ -34,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto; +import org.junit.Assert; import org.junit.Test; /** @@ -52,8 +51,8 @@ * License for the specific language governing permissions and limitations under * the License. */ - public class TestAllocateResponse { + @SuppressWarnings("deprecation") @Test public void testAllocateResponseWithIncDecContainers() { List incContainers = @@ -96,6 +95,7 @@ public void testAllocateResponseWithIncDecContainers() { } } + @SuppressWarnings("deprecation") @Test public void testAllocateResponseWithoutIncDecContainers() { AllocateResponse r = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index 59db66a83c..568111d56a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -399,7 +399,7 @@ public static URL newURL(String scheme, String host, int port, String file) { url.setFile(file); return url; } - + public static AllocateResponse newAllocateResponse(int responseId, List completedContainers, List allocatedContainers, List updatedNodes, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 6a00a249e2..a58dbae296 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -22,7 +22,11 @@ import java.io.InputStream; import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -46,7 +50,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -63,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; @@ -106,18 +110,12 @@ public class ApplicationMasterService extends AbstractService implements RecordFactoryProvider.getRecordFactory(null); private final ConcurrentMap responseMap = new ConcurrentHashMap(); - private final AllocateResponse resync = - recordFactory.newRecordInstance(AllocateResponse.class); - private final AllocateResponse shutdown = - recordFactory.newRecordInstance(AllocateResponse.class); private final RMContext rmContext; public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { super(ApplicationMasterService.class.getName()); this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.rScheduler = scheduler; - this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN); - this.resync.setAMCommand(AMCommand.AM_RESYNC); this.rmContext = rmContext; } @@ -429,36 +427,35 @@ public AllocateResponse allocate(AllocateRequest request) /* check if its in cache */ AllocateResponseLock lock = responseMap.get(appAttemptId); if (lock == null) { - LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); - return shutdown; + String message = + "Application attempt " + appAttemptId + + " doesn't exist in ApplicationMasterService cache."; + LOG.error(message); + throw new ApplicationAttemptNotFoundException(message); } synchronized (lock) { AllocateResponse lastResponse = lock.getAllocateResponse(); if (!hasApplicationMasterRegistered(appAttemptId)) { String message = - "Application Master is not registered for known application: " - + applicationId - + ". Let AM resync."; + "AM is not registered for known application attempt: " + appAttemptId + + " or RM had restarted after AM registered . AM should re-register."; LOG.info(message); RMAuditLogger.logFailure( - this.rmContext.getRMApps().get(applicationId) - .getUser(), AuditConstants.REGISTER_AM, "", - "ApplicationMasterService", message, - applicationId, - appAttemptId); - return resync; + this.rmContext.getRMApps().get(appAttemptId.getApplicationId()) + .getUser(), AuditConstants.AM_ALLOCATE, "", + "ApplicationMasterService", message, applicationId, appAttemptId); + throw new ApplicationMasterNotRegisteredException(message); } if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { /* old heartbeat */ return lastResponse; } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { - LOG.error("Invalid responseid from appAttemptId " + appAttemptId); - // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO: - // Reboot is not useful since after AM reboots, it will send register - // and - // get an exception. Might as well throw an exception here. - return resync; + String message = + "Invalid responseId in AllocateRequest from application attempt: " + + appAttemptId + ", expect responseId to be " + + (lastResponse.getResponseId() + 1); + throw new InvalidApplicationMasterRequestException(message); } //filter illegal progress values diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java index 6dd67c792b..db8a46a0d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAuditLogger.java @@ -50,6 +50,7 @@ public static class AuditConstants { public static final String FINISH_FAILED_APP = "Application Finished - Failed"; public static final String FINISH_KILLED_APP = "Application Finished - Killed"; public static final String REGISTER_AM = "Register App Master"; + public static final String AM_ALLOCATE = "App Master Heartbeats"; public static final String UNREGISTER_AM = "Unregister App Master"; public static final String ALLOC_CONTAINER = "AM Allocated Container"; public static final String RELEASE_CONTAINER = "AM Released Container"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java index 36182f5166..11cd1fd61a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterLauncher.java @@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; -import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -44,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; @@ -195,29 +196,33 @@ public void testallocateBeforeAMRegistration() throws Exception { // request for containers int request = 2; - AllocateResponse ar = - am.allocate("h1", 1000, request, new ArrayList()); - Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); + AllocateResponse ar = null; + try { + ar = am.allocate("h1", 1000, request, new ArrayList()); + Assert.fail(); + } catch (ApplicationMasterNotRegisteredException e) { + } // kick the scheduler nm1.nodeHeartbeat(true); - AllocateResponse amrs = - am.allocate(new ArrayList(), + + AllocateResponse amrs = null; + try { + amrs = am.allocate(new ArrayList(), new ArrayList()); - Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); + Assert.fail(); + } catch (ApplicationMasterNotRegisteredException e) { + } am.registerAppAttempt(); - thrown = false; try { - am.registerAppAttempt(false); - } - catch (Exception e) { + am.registerAppAttempt(false); + Assert.fail(); + } catch (Exception e) { Assert.assertEquals("Application Master is already registered : " + attempt.getAppAttemptId().getApplicationId(), e.getMessage()); - thrown = true; } - Assert.assertTrue(thrown); // Simulate an AM that was disconnected and app attempt was removed // (responseMap does not contain attemptid) @@ -226,9 +231,11 @@ public void testallocateBeforeAMRegistration() throws Exception { ContainerState.COMPLETE); am.waitForState(RMAppAttemptState.FINISHED); - AllocateResponse amrs2 = - am.allocate(new ArrayList(), - new ArrayList()); - Assert.assertTrue(amrs2.getAMCommand() == AMCommand.AM_SHUTDOWN); + try { + amrs = am.allocate(new ArrayList(), + new ArrayList()); + Assert.fail(); + } catch (ApplicationAttemptNotFoundException e) { + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index b37b648ae8..9502ebac72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -52,7 +52,6 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.service.Service.STATE; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; @@ -61,7 +60,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; -import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -78,6 +76,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -316,10 +315,13 @@ public void testRMRestart() throws Exception { // verify old AM is not accepted // change running AM to talk to new RM am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); - AllocateResponse allocResponse = am1.allocate( - new ArrayList(), + try { + am1.allocate(new ArrayList(), new ArrayList()); - Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand()); + Assert.fail(); + } catch (ApplicationAttemptNotFoundException e) { + Assert.assertTrue(e instanceof ApplicationAttemptNotFoundException); + } // NM should be rebooted on heartbeat, even first heartbeat for nm2 NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); @@ -1749,8 +1751,7 @@ public void testQueueMetricsOnRMRestart() throws Exception { nm1.setResourceTrackerService(rm2.getResourceTrackerService()); // recover app RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); - am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); - am1.allocate(new ArrayList(), new ArrayList()); + nm1.nodeHeartbeat(true); nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java index 0628fdd4f4..d609d7a769 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCResponseId.java @@ -20,13 +20,11 @@ import java.security.PrivilegedExceptionAction; -import org.junit.Assert; - import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -35,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -107,7 +106,12 @@ public void testARRMResponseId() throws Exception { /** try sending old request again **/ allocateRequest = AllocateRequest.newInstance(0, 0F, null, null, null); - response = allocate(attempt.getAppAttemptId(), allocateRequest); - Assert.assertTrue(response.getAMCommand() == AMCommand.AM_RESYNC); + + try { + allocate(attempt.getAppAttemptId(), allocateRequest); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e.getCause() instanceof InvalidApplicationMasterRequestException); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java index a0d5d84bdc..be833a1613 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestAMRMTokens.java @@ -23,6 +23,7 @@ import java.security.PrivilegedAction; import java.util.Arrays; import java.util.Collection; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -206,6 +207,7 @@ public void testTokenExpiry() throws Exception { * * @throws Exception */ + @SuppressWarnings("deprecation") @Test public void testMasterKeyRollOver() throws Exception {