YARN-2209. Replaced AM resync/shutdown command with corresponding exceptions and made related MR changes. Contributed by Jian He.

This commit is contained in:
Zhijie Shen 2014-10-23 21:56:03 -07:00
parent 57dec28807
commit 0f3b6900be
23 changed files with 196 additions and 215 deletions

View File

@ -36,12 +36,13 @@
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -98,11 +99,24 @@ protected synchronized void heartbeat() throws Exception {
AllocateRequest.newInstance(this.lastResponseID, AllocateRequest.newInstance(this.lastResponseID,
super.getApplicationProgress(), new ArrayList<ResourceRequest>(), super.getApplicationProgress(), new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>(), null); new ArrayList<ContainerId>(), null);
AllocateResponse allocateResponse;
try { try {
allocateResponse = scheduler.allocate(allocateRequest); scheduler.allocate(allocateRequest);
// Reset retry count if no exception occurred. // Reset retry count if no exception occurred.
retrystartTime = System.currentTimeMillis(); retrystartTime = System.currentTimeMillis();
} catch (ApplicationAttemptNotFoundException e) {
LOG.info("Event from RM: shutting down Application Master");
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.JOB_AM_REBOOT));
throw new YarnRuntimeException(
"Resource Manager doesn't recognize AttemptId: "
+ this.getContext().getApplicationID(), e);
} catch (ApplicationMasterNotRegisteredException e) {
LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+ " hence resync and send outstanding requests.");
this.lastResponseID = 0;
register();
} catch (Exception e) { } catch (Exception e) {
// This can happen when the connection to the RM has gone down. Keep // This can happen when the connection to the RM has gone down. Keep
// re-trying until the retryInterval has expired. // re-trying until the retryInterval has expired.
@ -117,29 +131,6 @@ protected synchronized void heartbeat() throws Exception {
// continue to attempt to contact the RM. // continue to attempt to contact the RM.
throw e; throw e;
} }
if (allocateResponse.getAMCommand() != null) {
switch(allocateResponse.getAMCommand()) {
case AM_RESYNC:
LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+ " hence resyncing.");
this.lastResponseID = 0;
register();
break;
case AM_SHUTDOWN:
LOG.info("Event from RM: shutting down Application Master");
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.JOB_AM_REBOOT));
throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +
this.getContext().getApplicationID());
default:
String msg =
"Unhandled value of AMCommand: " + allocateResponse.getAMCommand();
LOG.error(msg);
throw new YarnRuntimeException(msg);
}
}
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -76,6 +76,8 @@
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@ -246,7 +248,7 @@ public void run() {
protected synchronized void heartbeat() throws Exception { protected synchronized void heartbeat() throws Exception {
scheduleStats.updateAndLogIfChanged("Before Scheduling: "); scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
List<Container> allocatedContainers = getResources(); List<Container> allocatedContainers = getResources();
if (allocatedContainers.size() > 0) { if (allocatedContainers != null && allocatedContainers.size() > 0) {
scheduledRequests.assign(allocatedContainers); scheduledRequests.assign(allocatedContainers);
} }
@ -675,6 +677,22 @@ private List<Container> getResources() throws Exception {
response = makeRemoteRequest(); response = makeRemoteRequest();
// Reset retry count if no exception occurred. // Reset retry count if no exception occurred.
retrystartTime = System.currentTimeMillis(); retrystartTime = System.currentTimeMillis();
} catch (ApplicationAttemptNotFoundException e ) {
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.JOB_AM_REBOOT));
throw new YarnRuntimeException(
"Resource Manager doesn't recognize AttemptId: "
+ this.getContext().getApplicationID(), e);
} catch (ApplicationMasterNotRegisteredException e) {
LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+ " hence resync and send outstanding requests.");
// RM may have restarted, re-register with RM.
lastResponseID = 0;
register();
addOutstandingRequestOnResync();
return null;
} catch (Exception e) { } catch (Exception e) {
// This can happen when the connection to the RM has gone down. Keep // This can happen when the connection to the RM has gone down. Keep
// re-trying until the retryInterval has expired. // re-trying until the retryInterval has expired.
@ -689,32 +707,6 @@ private List<Container> getResources() throws Exception {
// continue to attempt to contact the RM. // continue to attempt to contact the RM.
throw e; throw e;
} }
if (response.getAMCommand() != null) {
switch(response.getAMCommand()) {
case AM_RESYNC:
LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+ " hence resyncing.");
lastResponseID = 0;
// Registering to allow RM to discover an active AM for this
// application
register();
addOutstandingRequestOnResync();
break;
case AM_SHUTDOWN:
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.JOB_AM_REBOOT));
throw new YarnRuntimeException("Resource Manager doesn't recognize AttemptId: " +
this.getContext().getApplicationID());
default:
String msg =
"Unhandled value of AMCommand: " + response.getAMCommand();
LOG.error(msg);
throw new YarnRuntimeException(msg);
}
}
Resource newHeadRoom = Resource newHeadRoom =
getAvailableResources() == null ? Resources.none() getAvailableResources() == null ? Resources.none()
: getAvailableResources(); : getAvailableResources();

View File

@ -29,7 +29,6 @@
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -40,7 +39,6 @@
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -51,6 +49,8 @@
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Keeps the data structures to send container requests to RM. * Keeps the data structures to send container requests to RM.
@ -176,7 +176,8 @@ protected void serviceInit(Configuration conf) throws Exception {
LOG.info("blacklistDisablePercent is " + blacklistDisablePercent); LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
} }
protected AllocateResponse makeRemoteRequest() throws IOException { protected AllocateResponse makeRemoteRequest() throws YarnException,
IOException {
ResourceBlacklistRequest blacklistRequest = ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions), ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions),
new ArrayList<String>(blacklistRemovals)); new ArrayList<String>(blacklistRemovals));
@ -184,16 +185,7 @@ protected AllocateResponse makeRemoteRequest() throws IOException {
AllocateRequest.newInstance(lastResponseID, AllocateRequest.newInstance(lastResponseID,
super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask), super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
new ArrayList<ContainerId>(release), blacklistRequest); new ArrayList<ContainerId>(release), blacklistRequest);
AllocateResponse allocateResponse; AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
try {
allocateResponse = scheduler.allocate(allocateRequest);
} catch (YarnException e) {
throw new IOException(e);
}
if (isResyncCommand(allocateResponse)) {
return allocateResponse;
}
lastResponseID = allocateResponse.getResponseId(); lastResponseID = allocateResponse.getResponseId();
availableResources = allocateResponse.getAvailableResources(); availableResources = allocateResponse.getAvailableResources();
lastClusterNmCount = clusterNmCount; lastClusterNmCount = clusterNmCount;
@ -222,11 +214,6 @@ protected AllocateResponse makeRemoteRequest() throws IOException {
return allocateResponse; return allocateResponse;
} }
protected boolean isResyncCommand(AllocateResponse allocateResponse) {
return allocateResponse.getAMCommand() != null
&& allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
}
protected void addOutstandingRequestOnResync() { protected void addOutstandingRequestOnResync() {
for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable
.values()) { .values()) {

View File

@ -31,15 +31,17 @@
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -50,6 +52,9 @@
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
@ -87,9 +92,11 @@
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
@ -103,10 +110,12 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -1752,14 +1761,11 @@ public void updateSchedulerProxy(MyResourceManager rm) {
} }
@Override @Override
protected AllocateResponse makeRemoteRequest() throws IOException { protected AllocateResponse makeRemoteRequest() throws IOException,
YarnException {
allocateResponse = super.makeRemoteRequest(); allocateResponse = super.makeRemoteRequest();
return allocateResponse; return allocateResponse;
} }
public boolean isResyncCommand() {
return super.isResyncCommand(allocateResponse);
}
} }
@Test @Test
@ -2255,8 +2261,6 @@ public void testRMContainerAllocatorResendsRequestsOnRMRestart()
// send allocate request to 2nd RM and get resync command // send allocate request to 2nd RM and get resync command
allocator.schedule(); allocator.schedule();
dispatcher.await(); dispatcher.await();
Assert.assertTrue("Last allocate response is not RESYNC",
allocator.isResyncCommand());
// Step-5 : On Resync,AM sends all outstanding // Step-5 : On Resync,AM sends all outstanding
// asks,release,blacklistAaddition // asks,release,blacklistAaddition

View File

@ -388,6 +388,9 @@ Release 2.6.0 - UNRELEASED
YARN-2709. Made timeline client getDelegationToken API retry if ConnectException YARN-2709. Made timeline client getDelegationToken API retry if ConnectException
happens. (Li Lu via zjshen) happens. (Li Lu via zjshen)
YARN-2209. Replaced AM resync/shutdown command with corresponding exceptions and
made related MR changes. (Jian He via zjshen)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -86,7 +86,7 @@ public static AllocateResponse newInstance(int responseId,
response.setNMTokens(nmTokens); response.setNMTokens(nmTokens);
return response; return response;
} }
@Public @Public
@Stable @Stable
public static AllocateResponse newInstance(int responseId, public static AllocateResponse newInstance(int responseId,

View File

@ -20,7 +20,11 @@
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
/** /**
* Command sent by the Resource Manager to the Application Master in the * Command sent by the Resource Manager to the Application Master in the
@ -30,16 +34,26 @@
@Public @Public
@Unstable @Unstable
public enum AMCommand { public enum AMCommand {
/** /**
* Sent by Resource Manager when it is out of sync with the AM and wants the * @deprecated Sent by Resource Manager when it is out of sync with the AM and
* AM get back in sync. * wants the AM get back in sync.
*
* Note: Instead of sending this command,
* {@link ApplicationMasterNotRegisteredException} will be thrown
* when ApplicationMaster is out of sync with ResourceManager and
* ApplicationMaster is expected to re-register with RM by calling
* {@link ApplicationMasterProtocol#registerApplicationMaster(RegisterApplicationMasterRequest)}
*/ */
AM_RESYNC, AM_RESYNC,
/** /**
* Sent by Resource Manager when it wants the AM to shutdown. Eg. when the * @deprecated Sent by Resource Manager when it wants the AM to shutdown.
* node is going down for maintenance. The AM should save any state and * Note: This command was earlier sent by ResourceManager to
* prepare to be restarted at a later time. * instruct AM to shutdown if RM had restarted. Now
* {@link ApplicationAttemptNotFoundException} will be thrown in case
* that RM has restarted and AM is supposed to handle this
* exception by shutting down itself.
*/ */
AM_SHUTDOWN AM_SHUTDOWN
} }

View File

@ -21,12 +21,16 @@
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol; import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
/** /**
* This exception is thrown on * This exception is thrown on
* {@link ApplicationHistoryProtocol#getApplicationAttemptReport (GetApplicationAttemptReportRequest)} * {@link ApplicationHistoryProtocol#getApplicationAttemptReport (GetApplicationAttemptReportRequest)}
* API when the Application Attempt doesn't exist in Application History Server * API when the Application Attempt doesn't exist in Application History Server or
* {@link ApplicationMasterProtocol#allocate(AllocateRequest)} if application
* doesn't exist in RM.
*/ */
@Public @Public
@Unstable @Unstable

View File

@ -31,7 +31,6 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -43,6 +42,7 @@
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -219,9 +219,13 @@ public void run() {
if (!keepRunning) { if (!keepRunning) {
return; return;
} }
try { try {
response = client.allocate(progress); response = client.allocate(progress);
} catch (ApplicationAttemptNotFoundException e) {
handler.onShutdownRequest();
LOG.info("Shutdown requested. Stopping callback.");
return;
} catch (Throwable ex) { } catch (Throwable ex) {
LOG.error("Exception on heartbeat", ex); LOG.error("Exception on heartbeat", ex);
savedException = ex; savedException = ex;
@ -229,21 +233,17 @@ public void run() {
handlerThread.interrupt(); handlerThread.interrupt();
return; return;
} }
} if (response != null) {
if (response != null) { while (true) {
while (true) { try {
try { responseQueue.put(response);
responseQueue.put(response); break;
if (response.getAMCommand() == AMCommand.AM_SHUTDOWN) { } catch (InterruptedException ex) {
return; LOG.debug("Interrupted while waiting to put on response queue", ex);
} }
break;
} catch (InterruptedException ex) {
LOG.debug("Interrupted while waiting to put on response queue", ex);
} }
} }
} }
try { try {
Thread.sleep(heartbeatIntervalMs.get()); Thread.sleep(heartbeatIntervalMs.get());
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
@ -276,20 +276,6 @@ public void run() {
LOG.info("Interrupted while waiting for queue", ex); LOG.info("Interrupted while waiting for queue", ex);
continue; continue;
} }
if (response.getAMCommand() != null) {
switch(response.getAMCommand()) {
case AM_SHUTDOWN:
handler.onShutdownRequest();
LOG.info("Shutdown requested. Stopping callback.");
return;
default:
String msg =
"Unhandled value of RM AMCommand: " + response.getAMCommand();
LOG.error(msg);
throw new YarnRuntimeException(msg);
}
}
List<NodeReport> updatedNodes = response.getUpdatedNodes(); List<NodeReport> updatedNodes = response.getUpdatedNodes();
if (!updatedNodes.isEmpty()) { if (!updatedNodes.isEmpty()) {
handler.onNodesUpdated(updatedNodes); handler.onNodesUpdated(updatedNodes);

View File

@ -49,7 +49,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@ -275,15 +274,16 @@ public AllocateResponse allocate(float progressIndicator)
blacklistRemovals.clear(); blacklistRemovals.clear();
} }
allocateResponse = rmClient.allocate(allocateRequest); try {
if (isResyncCommand(allocateResponse)) { allocateResponse = rmClient.allocate(allocateRequest);
} catch (ApplicationMasterNotRegisteredException e) {
LOG.warn("ApplicationMaster is out of sync with ResourceManager," LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
+ " hence resyncing."); + " hence resyncing.");
synchronized (this) { synchronized (this) {
release.addAll(this.pendingRelease); release.addAll(this.pendingRelease);
blacklistAdditions.addAll(this.blacklistedNodes); blacklistAdditions.addAll(this.blacklistedNodes);
for (Map<String, TreeMap<Resource, ResourceRequestInfo>> rr : remoteRequestsTable for (Map<String, TreeMap<Resource, ResourceRequestInfo>> rr : remoteRequestsTable
.values()) { .values()) {
for (Map<Resource, ResourceRequestInfo> capabalities : rr.values()) { for (Map<Resource, ResourceRequestInfo> capabalities : rr.values()) {
for (ResourceRequestInfo request : capabalities.values()) { for (ResourceRequestInfo request : capabalities.values()) {
addResourceRequestToAsk(request.remoteRequest); addResourceRequestToAsk(request.remoteRequest);
@ -293,7 +293,8 @@ public AllocateResponse allocate(float progressIndicator)
} }
// re register with RM // re register with RM
registerApplicationMaster(); registerApplicationMaster();
return allocate(progressIndicator); allocateResponse = allocate(progressIndicator);
return allocateResponse;
} }
synchronized (this) { synchronized (this) {
@ -349,11 +350,6 @@ protected void removePendingReleaseRequests(
} }
} }
private boolean isResyncCommand(AllocateResponse allocateResponse) {
return allocateResponse.getAMCommand() != null
&& allocateResponse.getAMCommand() == AMCommand.AM_RESYNC;
}
@Private @Private
@VisibleForTesting @VisibleForTesting
protected void populateNMTokens(List<NMToken> nmTokens) { protected void populateNMTokens(List<NMToken> nmTokens) {

View File

@ -827,7 +827,7 @@ public AllocateResponse createFakeAllocateResponse() {
return AllocateResponse.newInstance(-1, return AllocateResponse.newInstance(-1,
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>(),
new ArrayList<Container>(), new ArrayList<NodeReport>(), new ArrayList<Container>(), new ArrayList<NodeReport>(),
Resource.newInstance(1024, 2), AMCommand.AM_RESYNC, 1, Resource.newInstance(1024, 2), null, 1,
null, new ArrayList<NMToken>()); null, new ArrayList<NMToken>());
} }
} }

View File

@ -18,16 +18,15 @@
package org.apache.hadoop.yarn.client.api.async.impl; package org.apache.hadoop.yarn.client.api.async.impl;
import com.google.common.base.Supplier; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -36,13 +35,10 @@
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
@ -56,12 +52,16 @@
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl; import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import com.google.common.base.Supplier;
public class TestAMRMClientAsync { public class TestAMRMClientAsync {
private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class); private static final Log LOG = LogFactory.getLog(TestAMRMClientAsync.class);
@ -211,10 +211,10 @@ public void testAMRMClientAsyncShutDown() throws Exception {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class); AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
final AllocateResponse shutDownResponse = createAllocateResponse( createAllocateResponse(new ArrayList<ContainerStatus>(),
new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null); new ArrayList<Container>(), null);
shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN); when(client.allocate(anyFloat())).thenThrow(
when(client.allocate(anyFloat())).thenReturn(shutDownResponse); new ApplicationAttemptNotFoundException("app not found, shut down"));
AMRMClientAsync<ContainerRequest> asyncClient = AMRMClientAsync<ContainerRequest> asyncClient =
AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler); AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler);
@ -235,11 +235,8 @@ public void testAMRMClientAsyncShutDownWithWaitFor() throws Exception {
final TestCallbackHandler callbackHandler = new TestCallbackHandler(); final TestCallbackHandler callbackHandler = new TestCallbackHandler();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class); AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
when(client.allocate(anyFloat())).thenThrow(
final AllocateResponse shutDownResponse = createAllocateResponse( new ApplicationAttemptNotFoundException("app not found, shut down"));
new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
shutDownResponse.setAMCommand(AMCommand.AM_SHUTDOWN);
when(client.allocate(anyFloat())).thenReturn(shutDownResponse);
AMRMClientAsync<ContainerRequest> asyncClient = AMRMClientAsync<ContainerRequest> asyncClient =
AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler); AMRMClientAsync.createAMRMClientAsync(client, 10, callbackHandler);

View File

@ -932,7 +932,7 @@ public void testAMRMClientOnAMRMTokenRollOver() throws YarnException,
Assert.assertNotEquals(amrmToken_1, amrmToken_2); Assert.assertNotEquals(amrmToken_1, amrmToken_2);
// can do the allocate call with latest AMRMToken // can do the allocate call with latest AMRMToken
amClient.allocate(0.1f); AllocateResponse response = amClient.allocate(0.1f);
// Verify latest AMRMToken can be used to send allocation request. // Verify latest AMRMToken can be used to send allocation request.
UserGroupInformation testUser1 = UserGroupInformation testUser1 =
@ -953,7 +953,8 @@ public void testAMRMClientOnAMRMTokenRollOver() throws YarnException,
.getResourceManager().getApplicationMasterService().getBindAddress()); .getResourceManager().getApplicationMasterService().getBindAddress());
testUser1.addToken(newVersionToken); testUser1.addToken(newVersionToken);
AllocateRequest request = Records.newRecord(AllocateRequest.class);
request.setResponseId(response.getResponseId());
testUser1.doAs(new PrivilegedAction<ApplicationMasterProtocol>() { testUser1.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
@Override @Override
public ApplicationMasterProtocol run() { public ApplicationMasterProtocol run() {
@ -962,7 +963,7 @@ public ApplicationMasterProtocol run() {
yarnCluster.getResourceManager().getApplicationMasterService() yarnCluster.getResourceManager().getApplicationMasterService()
.getBindAddress(), conf); .getBindAddress(), conf);
} }
}).allocate(Records.newRecord(AllocateRequest.class)); }).allocate(request);
// Make sure previous token has been rolled-over // Make sure previous token has been rolled-over
// and can not use this rolled-over token to make a allocate all. // and can not use this rolled-over token to make a allocate all.

View File

@ -51,11 +51,10 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto; import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
@Private @Private
@Unstable @Unstable
public class ProtoUtils { public class ProtoUtils {

View File

@ -17,8 +17,6 @@
*/ */
package org.apache.hadoop.yarn.util.resource; package org.apache.hadoop.yarn.util.resource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -30,14 +28,11 @@
@Unstable @Unstable
public abstract class ResourceCalculator { public abstract class ResourceCalculator {
private static final Log LOG = LogFactory.getLog(ResourceCalculator.class);
public abstract int public abstract int
compare(Resource clusterResource, Resource lhs, Resource rhs); compare(Resource clusterResource, Resource lhs, Resource rhs);
public static int divideAndCeil(int a, int b) { public static int divideAndCeil(int a, int b) {
if (b == 0) { if (b == 0) {
LOG.info("divideAndCeil called with a=" + a + " b=" + b);
return 0; return 0;
} }
return (a + (b - 1)) / b; return (a + (b - 1)) / b;

View File

@ -21,8 +21,6 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.junit.Assert;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.AMCommand;
@ -34,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
/** /**
@ -52,8 +51,8 @@
* License for the specific language governing permissions and limitations under * License for the specific language governing permissions and limitations under
* the License. * the License.
*/ */
public class TestAllocateResponse { public class TestAllocateResponse {
@SuppressWarnings("deprecation")
@Test @Test
public void testAllocateResponseWithIncDecContainers() { public void testAllocateResponseWithIncDecContainers() {
List<ContainerResourceIncrease> incContainers = List<ContainerResourceIncrease> incContainers =
@ -96,6 +95,7 @@ public void testAllocateResponseWithIncDecContainers() {
} }
} }
@SuppressWarnings("deprecation")
@Test @Test
public void testAllocateResponseWithoutIncDecContainers() { public void testAllocateResponseWithoutIncDecContainers() {
AllocateResponse r = AllocateResponse r =

View File

@ -399,7 +399,7 @@ public static URL newURL(String scheme, String host, int port, String file) {
url.setFile(file); url.setFile(file);
return url; return url;
} }
public static AllocateResponse newAllocateResponse(int responseId, public static AllocateResponse newAllocateResponse(int responseId,
List<ContainerStatus> completedContainers, List<ContainerStatus> completedContainers,
List<Container> allocatedContainers, List<NodeReport> updatedNodes, List<Container> allocatedContainers, List<NodeReport> updatedNodes,

View File

@ -22,7 +22,11 @@
import java.io.InputStream; import java.io.InputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.*; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -46,7 +50,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -63,6 +66,7 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException; import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
@ -106,18 +110,12 @@ public class ApplicationMasterService extends AbstractService implements
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap = private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>(); new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
private final AllocateResponse resync =
recordFactory.newRecordInstance(AllocateResponse.class);
private final AllocateResponse shutdown =
recordFactory.newRecordInstance(AllocateResponse.class);
private final RMContext rmContext; private final RMContext rmContext;
public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) { public ApplicationMasterService(RMContext rmContext, YarnScheduler scheduler) {
super(ApplicationMasterService.class.getName()); super(ApplicationMasterService.class.getName());
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rScheduler = scheduler; this.rScheduler = scheduler;
this.shutdown.setAMCommand(AMCommand.AM_SHUTDOWN);
this.resync.setAMCommand(AMCommand.AM_RESYNC);
this.rmContext = rmContext; this.rmContext = rmContext;
} }
@ -429,36 +427,35 @@ public AllocateResponse allocate(AllocateRequest request)
/* check if its in cache */ /* check if its in cache */
AllocateResponseLock lock = responseMap.get(appAttemptId); AllocateResponseLock lock = responseMap.get(appAttemptId);
if (lock == null) { if (lock == null) {
LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId); String message =
return shutdown; "Application attempt " + appAttemptId
+ " doesn't exist in ApplicationMasterService cache.";
LOG.error(message);
throw new ApplicationAttemptNotFoundException(message);
} }
synchronized (lock) { synchronized (lock) {
AllocateResponse lastResponse = lock.getAllocateResponse(); AllocateResponse lastResponse = lock.getAllocateResponse();
if (!hasApplicationMasterRegistered(appAttemptId)) { if (!hasApplicationMasterRegistered(appAttemptId)) {
String message = String message =
"Application Master is not registered for known application: " "AM is not registered for known application attempt: " + appAttemptId
+ applicationId + " or RM had restarted after AM registered . AM should re-register.";
+ ". Let AM resync.";
LOG.info(message); LOG.info(message);
RMAuditLogger.logFailure( RMAuditLogger.logFailure(
this.rmContext.getRMApps().get(applicationId) this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
.getUser(), AuditConstants.REGISTER_AM, "", .getUser(), AuditConstants.AM_ALLOCATE, "",
"ApplicationMasterService", message, "ApplicationMasterService", message, applicationId, appAttemptId);
applicationId, throw new ApplicationMasterNotRegisteredException(message);
appAttemptId);
return resync;
} }
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
/* old heartbeat */ /* old heartbeat */
return lastResponse; return lastResponse;
} else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
LOG.error("Invalid responseid from appAttemptId " + appAttemptId); String message =
// Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO: "Invalid responseId in AllocateRequest from application attempt: "
// Reboot is not useful since after AM reboots, it will send register + appAttemptId + ", expect responseId to be "
// and + (lastResponse.getResponseId() + 1);
// get an exception. Might as well throw an exception here. throw new InvalidApplicationMasterRequestException(message);
return resync;
} }
//filter illegal progress values //filter illegal progress values

View File

@ -50,6 +50,7 @@ public static class AuditConstants {
public static final String FINISH_FAILED_APP = "Application Finished - Failed"; public static final String FINISH_FAILED_APP = "Application Finished - Failed";
public static final String FINISH_KILLED_APP = "Application Finished - Killed"; public static final String FINISH_KILLED_APP = "Application Finished - Killed";
public static final String REGISTER_AM = "Register App Master"; public static final String REGISTER_AM = "Register App Master";
public static final String AM_ALLOCATE = "App Master Heartbeats";
public static final String UNREGISTER_AM = "Unregister App Master"; public static final String UNREGISTER_AM = "Unregister App Master";
public static final String ALLOC_CONTAINER = "AM Allocated Container"; public static final String ALLOC_CONTAINER = "AM Allocated Container";
public static final String RELEASE_CONTAINER = "AM Released Container"; public static final String RELEASE_CONTAINER = "AM Released Container";

View File

@ -36,7 +36,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
@ -44,6 +43,8 @@
import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@ -195,29 +196,33 @@ public void testallocateBeforeAMRegistration() throws Exception {
// request for containers // request for containers
int request = 2; int request = 2;
AllocateResponse ar = AllocateResponse ar = null;
am.allocate("h1", 1000, request, new ArrayList<ContainerId>()); try {
Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); ar = am.allocate("h1", 1000, request, new ArrayList<ContainerId>());
Assert.fail();
} catch (ApplicationMasterNotRegisteredException e) {
}
// kick the scheduler // kick the scheduler
nm1.nodeHeartbeat(true); nm1.nodeHeartbeat(true);
AllocateResponse amrs =
am.allocate(new ArrayList<ResourceRequest>(), AllocateResponse amrs = null;
try {
amrs = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()); new ArrayList<ContainerId>());
Assert.assertTrue(ar.getAMCommand() == AMCommand.AM_RESYNC); Assert.fail();
} catch (ApplicationMasterNotRegisteredException e) {
}
am.registerAppAttempt(); am.registerAppAttempt();
thrown = false;
try { try {
am.registerAppAttempt(false); am.registerAppAttempt(false);
} Assert.fail();
catch (Exception e) { } catch (Exception e) {
Assert.assertEquals("Application Master is already registered : " Assert.assertEquals("Application Master is already registered : "
+ attempt.getAppAttemptId().getApplicationId(), + attempt.getAppAttemptId().getApplicationId(),
e.getMessage()); e.getMessage());
thrown = true;
} }
Assert.assertTrue(thrown);
// Simulate an AM that was disconnected and app attempt was removed // Simulate an AM that was disconnected and app attempt was removed
// (responseMap does not contain attemptid) // (responseMap does not contain attemptid)
@ -226,9 +231,11 @@ public void testallocateBeforeAMRegistration() throws Exception {
ContainerState.COMPLETE); ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FINISHED); am.waitForState(RMAppAttemptState.FINISHED);
AllocateResponse amrs2 = try {
am.allocate(new ArrayList<ResourceRequest>(), amrs = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()); new ArrayList<ContainerId>());
Assert.assertTrue(amrs2.getAMCommand() == AMCommand.AM_SHUTDOWN); Assert.fail();
} catch (ApplicationAttemptNotFoundException e) {
}
} }
} }

View File

@ -52,7 +52,6 @@
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
@ -61,7 +60,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -78,6 +76,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@ -316,10 +315,13 @@ public void testRMRestart() throws Exception {
// verify old AM is not accepted // verify old AM is not accepted
// change running AM to talk to new RM // change running AM to talk to new RM
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext()); am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
AllocateResponse allocResponse = am1.allocate( try {
new ArrayList<ResourceRequest>(), am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()); new ArrayList<ContainerId>());
Assert.assertEquals(AMCommand.AM_SHUTDOWN, allocResponse.getAMCommand()); Assert.fail();
} catch (ApplicationAttemptNotFoundException e) {
Assert.assertTrue(e instanceof ApplicationAttemptNotFoundException);
}
// NM should be rebooted on heartbeat, even first heartbeat for nm2 // NM should be rebooted on heartbeat, even first heartbeat for nm2
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
@ -1749,8 +1751,7 @@ public void testQueueMetricsOnRMRestart() throws Exception {
nm1.setResourceTrackerService(rm2.getResourceTrackerService()); nm1.setResourceTrackerService(rm2.getResourceTrackerService());
// recover app // recover app
RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true); nm1.nodeHeartbeat(true);
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());

View File

@ -20,13 +20,11 @@
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import org.junit.Assert;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMCommand;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@ -35,6 +33,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -107,7 +106,12 @@ public void testARRMResponseId() throws Exception {
/** try sending old request again **/ /** try sending old request again **/
allocateRequest = AllocateRequest.newInstance(0, 0F, null, null, null); allocateRequest = AllocateRequest.newInstance(0, 0F, null, null, null);
response = allocate(attempt.getAppAttemptId(), allocateRequest);
Assert.assertTrue(response.getAMCommand() == AMCommand.AM_RESYNC); try {
allocate(attempt.getAppAttemptId(), allocateRequest);
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getCause() instanceof InvalidApplicationMasterRequestException);
}
} }
} }

View File

@ -23,6 +23,7 @@
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -206,6 +207,7 @@ public void testTokenExpiry() throws Exception {
* *
* @throws Exception * @throws Exception
*/ */
@SuppressWarnings("deprecation")
@Test @Test
public void testMasterKeyRollOver() throws Exception { public void testMasterKeyRollOver() throws Exception {