MAPREDUCE-3078. Ensure MapReduce AM reports progress correctly for displaying on the RM Web-UI. Contributed by Vinod K V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1176762 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-09-28 07:31:03 +00:00
parent f4a425e10d
commit 312a7e7100
25 changed files with 822 additions and 548 deletions

View File

@ -1469,6 +1469,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3110. Fixed TestRPC failure. (vinodkv) MAPREDUCE-3110. Fixed TestRPC failure. (vinodkv)
MAPREDUCE-3078. Ensure MapReduce AM reports progress correctly for
displaying on the RM Web-UI. (vinodkv via acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -55,6 +55,12 @@
<artifactId>hadoop-yarn-server-resourcemanager</artifactId> <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-shuffle</artifactId> <artifactId>hadoop-mapreduce-client-shuffle</artifactId>

View File

@ -549,9 +549,9 @@ public void start() {
// It's more test friendly to put it here. // It's more test friendly to put it here.
DefaultMetricsSystem.initialize("MRAppMaster"); DefaultMetricsSystem.initialize("MRAppMaster");
/** create a job event for job intialization */ // create a job event for job intialization
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
/** send init to the job (this does NOT trigger job execution) */ // Send init to the job (this does NOT trigger job execution)
// This is a synchronous call, not an event through dispatcher. We want // This is a synchronous call, not an event through dispatcher. We want
// job-init to be done completely here. // job-init to be done completely here.
jobEventDispatcher.handle(initJobEvent); jobEventDispatcher.handle(initJobEvent);

View File

@ -92,6 +92,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
@ -584,25 +585,17 @@ public List<String> getDiagnostics() {
public JobReport getReport() { public JobReport getReport() {
readLock.lock(); readLock.lock();
try { try {
JobReport report = recordFactory.newRecordInstance(JobReport.class); JobState state = getState();
report.setJobId(jobId);
report.setJobState(getState());
// TODO - Fix to correctly setup report and to check state
if (report.getJobState() == JobState.NEW) {
return report;
}
report.setStartTime(startTime);
report.setFinishTime(finishTime);
report.setSetupProgress(setupProgress);
report.setCleanupProgress(cleanupProgress);
report.setMapProgress(computeProgress(mapTasks));
report.setReduceProgress(computeProgress(reduceTasks));
report.setJobName(jobName);
report.setUser(username);
return report; if (getState() == JobState.NEW) {
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
startTime, finishTime, setupProgress, 0.0f,
0.0f, cleanupProgress);
}
return MRBuilderUtils.newJobReport(jobId, jobName, username, state,
startTime, finishTime, setupProgress, computeProgress(mapTasks),
computeProgress(reduceTasks), cleanupProgress);
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app.local; package org.apache.hadoop.mapreduce.v2.app.local;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -30,15 +31,19 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator; import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
/** /**
@ -65,6 +70,20 @@ public LocalContainerAllocator(ClientService clientService,
this.appID = context.getApplicationID(); this.appID = context.getApplicationID();
} }
@Override
protected synchronized void heartbeat() throws Exception {
AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
this.applicationAttemptId, this.lastResponseID, super
.getApplicationProgress(), new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
AMResponse response = allocateResponse.getAMResponse();
if (response.getReboot()) {
// TODO
LOG.info("Event from RM: shutting down Application Master");
}
}
@Override @Override
public void handle(ContainerAllocatorEvent event) { public void handle(ContainerAllocatorEvent event) {
if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) { if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {

View File

@ -20,7 +20,6 @@
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.ArrayList;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -29,6 +28,7 @@
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
@ -42,17 +42,12 @@
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
@ -64,7 +59,7 @@
/** /**
* Registers/unregisters to RM and sends heartbeats to RM. * Registers/unregisters to RM and sends heartbeats to RM.
*/ */
public class RMCommunicator extends AbstractService { public abstract class RMCommunicator extends AbstractService {
private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class); private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
private int rmPollInterval;//millis private int rmPollInterval;//millis
protected ApplicationId applicationId; protected ApplicationId applicationId;
@ -74,7 +69,7 @@ public class RMCommunicator extends AbstractService {
protected EventHandler eventHandler; protected EventHandler eventHandler;
protected AMRMProtocol scheduler; protected AMRMProtocol scheduler;
private final ClientService clientService; private final ClientService clientService;
private int lastResponseID; protected int lastResponseID;
private Resource minContainerCapability; private Resource minContainerCapability;
private Resource maxContainerCapability; private Resource maxContainerCapability;
@ -121,6 +116,34 @@ protected Job getJob() {
return job; return job;
} }
/**
* Get the appProgress. Can be used only after this component is started.
* @return the appProgress.
*/
protected float getApplicationProgress() {
// For now just a single job. In future when we have a DAG, we need an
// aggregate progress.
JobReport report = this.job.getReport();
float setupWeight = 0.05f;
float cleanupWeight = 0.05f;
float mapWeight = 0.0f;
float reduceWeight = 0.0f;
int numMaps = this.job.getTotalMaps();
int numReduces = this.job.getTotalReduces();
if (numMaps == 0 && numReduces == 0) {
} else if (numMaps == 0) {
reduceWeight = 0.9f;
} else if (numReduces == 0) {
mapWeight = 0.9f;
} else {
mapWeight = reduceWeight = 0.45f;
}
return (report.getSetupProgress() * setupWeight
+ report.getCleanupProgress() * cleanupWeight
+ report.getMapProgress() * mapWeight + report.getReduceProgress()
* reduceWeight);
}
protected void register() { protected void register() {
//Register //Register
String host = String host =
@ -262,18 +285,5 @@ public AMRMProtocol run() {
}); });
} }
protected synchronized void heartbeat() throws Exception { protected abstract void heartbeat() throws Exception;
AllocateRequest allocateRequest =
recordFactory.newRecordInstance(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId);
allocateRequest.setResponseId(lastResponseID);
allocateRequest.addAllAsks(new ArrayList<ResourceRequest>());
allocateRequest.addAllReleases(new ArrayList<ContainerId>());
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
AMResponse response = allocateResponse.getAMResponse();
if (response.getReboot()) {
LOG.info("Event from RM: shutting down Application Master");
}
}
} }

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.BuilderUtils;
/** /**
* Keeps the data structures to send container requests to RM. * Keeps the data structures to send container requests to RM.
@ -107,15 +108,11 @@ public void init(Configuration conf) {
LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode); LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
} }
protected abstract void heartbeat() throws Exception;
protected AMResponse makeRemoteRequest() throws YarnRemoteException { protected AMResponse makeRemoteRequest() throws YarnRemoteException {
AllocateRequest allocateRequest = recordFactory AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
.newRecordInstance(AllocateRequest.class); applicationAttemptId, lastResponseID, super.getApplicationProgress(),
allocateRequest.setApplicationAttemptId(applicationAttemptId); new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(
allocateRequest.setResponseId(lastResponseID); release));
allocateRequest.addAllAsks(new ArrayList<ResourceRequest>(ask));
allocateRequest.addAllReleases(new ArrayList<ContainerId>(release));
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
AMResponse response = allocateResponse.getAMResponse(); AMResponse response = allocateResponse.getAMResponse();
lastResponseID = response.getResponseId(); lastResponseID = response.getResponseId();

View File

@ -19,27 +19,25 @@
package org.apache.hadoop.mapreduce.v2.util; package org.apache.hadoop.mapreduce.v2.util;
import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
public class MRBuilderUtils { public class MRBuilderUtils {
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
public static JobId newJobId(ApplicationId appId, int id) { public static JobId newJobId(ApplicationId appId, int id) {
JobId jobId = recordFactory.newRecordInstance(JobId.class); JobId jobId = Records.newRecord(JobId.class);
jobId.setAppId(appId); jobId.setAppId(appId);
jobId.setId(id); jobId.setId(id);
return jobId; return jobId;
} }
public static TaskId newTaskId(JobId jobId, int id, TaskType taskType) { public static TaskId newTaskId(JobId jobId, int id, TaskType taskType) {
TaskId taskId = recordFactory.newRecordInstance(TaskId.class); TaskId taskId = Records.newRecord(TaskId.class);
taskId.setJobId(jobId); taskId.setJobId(jobId);
taskId.setId(id); taskId.setId(id);
taskId.setTaskType(taskType); taskId.setTaskType(taskType);
@ -48,9 +46,27 @@ public static TaskId newTaskId(JobId jobId, int id, TaskType taskType) {
public static TaskAttemptId newTaskAttemptId(TaskId taskId, int attemptId) { public static TaskAttemptId newTaskAttemptId(TaskId taskId, int attemptId) {
TaskAttemptId taskAttemptId = TaskAttemptId taskAttemptId =
recordFactory.newRecordInstance(TaskAttemptId.class); Records.newRecord(TaskAttemptId.class);
taskAttemptId.setTaskId(taskId); taskAttemptId.setTaskId(taskId);
taskAttemptId.setId(attemptId); taskAttemptId.setId(attemptId);
return taskAttemptId; return taskAttemptId;
} }
public static JobReport newJobReport(JobId jobId, String jobName,
String userName, JobState state, long startTime, long finishTime,
float setupProgress, float mapProgress, float reduceProgress,
float cleanupProgress) {
JobReport report = Records.newRecord(JobReport.class);
report.setJobId(jobId);
report.setJobName(jobName);
report.setUser(userName);
report.setJobState(state);
report.setStartTime(startTime);
report.setFinishTime(finishTime);
report.setSetupProgress(setupProgress);
report.setCleanupProgress(cleanupProgress);
report.setMapProgress(mapProgress);
report.setReduceProgress(reduceProgress);
return report;
}
} }

View File

@ -64,6 +64,12 @@
<artifactId>hadoop-yarn-server-resourcemanager</artifactId> <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId> <artifactId>hadoop-yarn-server-common</artifactId>

View File

@ -88,6 +88,12 @@
<artifactId>hadoop-yarn-server-resourcemanager</artifactId> <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
<version>${yarn.version}</version> <version>${yarn.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
<version>${yarn.version}</version>
<type>test-jar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId> <artifactId>hadoop-mapreduce-client-core</artifactId>

View File

@ -20,7 +20,9 @@
import java.net.URI; import java.net.URI;
import java.util.Comparator; import java.util.Comparator;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
@ -184,6 +186,13 @@ public static ContainerId newContainerId(RecordFactory recordFactory,
return id; return id;
} }
public static NodeId newNodeId(String host, int port) {
NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
nodeId.setHost(host);
nodeId.setPort(port);
return nodeId;
}
public static Container newContainer(RecordFactory recordFactory, public static Container newContainer(RecordFactory recordFactory,
ApplicationAttemptId appAttemptId, int containerId, NodeId nodeId, ApplicationAttemptId appAttemptId, int containerId, NodeId nodeId,
String nodeHttpAddress, Resource resource, Priority priority) { String nodeHttpAddress, Resource resource, Priority priority) {
@ -266,5 +275,18 @@ public static URL newURL(String scheme, String host, int port, String file) {
url.setFile(file); url.setFile(file);
return url; return url;
} }
public static AllocateRequest newAllocateRequest(
ApplicationAttemptId applicationAttemptId, int responseID,
float appProgress, List<ResourceRequest> resourceAsk,
List<ContainerId> containersToBeReleased) {
AllocateRequest allocateRequest = recordFactory
.newRecordInstance(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId);
allocateRequest.setResponseId(responseID);
allocateRequest.setProgress(appProgress);
allocateRequest.addAllAsks(resourceAsk);
allocateRequest.addAllReleases(containersToBeReleased);
return allocateRequest;
}
} }

View File

@ -37,6 +37,20 @@
<build> <build>
<plugins> <plugins>
<!-- Publish tests jar -->
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
<phase>test-compile</phase>
</execution>
</executions>
</plugin>
<plugin> <plugin>
<artifactId>maven-antrun-plugin</artifactId> <artifactId>maven-antrun-plugin</artifactId>
<executions> <executions>

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier; import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager; import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
@ -250,13 +251,10 @@ protected synchronized void submitApplication(
if (rmContext.getRMApps().putIfAbsent(applicationId, application) != if (rmContext.getRMApps().putIfAbsent(applicationId, application) !=
null) { null) {
LOG.info("Application with id " + applicationId + String message = "Application with id " + applicationId
" is already present! Cannot add a duplicate!"); + " is already present! Cannot add a duplicate!";
// don't send event through dispatcher as it will be handled by app LOG.info(message);
// already present with this id. throw RPCUtil.getRemoteException(message);
application.handle(new RMAppRejectedEvent(applicationId,
"Application with this id is already present! " +
"Cannot add a duplicate!"));
} else { } else {
this.rmContext.getDispatcher().getEventHandler().handle( this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.START)); new RMAppEvent(applicationId, RMAppEventType.START));

View File

@ -98,7 +98,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
private ContainerAllocationExpirer containerAllocationExpirer; private ContainerAllocationExpirer containerAllocationExpirer;
protected NMLivelinessMonitor nmLivelinessMonitor; protected NMLivelinessMonitor nmLivelinessMonitor;
protected NodesListManager nodesListManager; protected NodesListManager nodesListManager;
private SchedulerEventDispatcher schedulerDispatcher; private EventHandler<SchedulerEvent> schedulerDispatcher;
protected RMAppManager rmAppManager; protected RMAppManager rmAppManager;
private WebApp webApp; private WebApp webApp;
@ -119,7 +119,7 @@ public RMContext getRMContext() {
@Override @Override
public synchronized void init(Configuration conf) { public synchronized void init(Configuration conf) {
this.rmDispatcher = new AsyncDispatcher(); this.rmDispatcher = createDispatcher();
addIfService(this.rmDispatcher); addIfService(this.rmDispatcher);
this.containerAllocationExpirer = new ContainerAllocationExpirer( this.containerAllocationExpirer = new ContainerAllocationExpirer(
@ -138,8 +138,8 @@ public synchronized void init(Configuration conf) {
this.conf = new YarnConfiguration(conf); this.conf = new YarnConfiguration(conf);
// Initialize the scheduler // Initialize the scheduler
this.scheduler = createScheduler(); this.scheduler = createScheduler();
this.schedulerDispatcher = new SchedulerEventDispatcher(this.scheduler); this.schedulerDispatcher = createSchedulerEventDispatcher();
addService(this.schedulerDispatcher); addIfService(this.schedulerDispatcher);
this.rmDispatcher.register(SchedulerEventType.class, this.rmDispatcher.register(SchedulerEventType.class,
this.schedulerDispatcher); this.schedulerDispatcher);
@ -195,6 +195,14 @@ public synchronized void init(Configuration conf) {
super.init(conf); super.init(conf);
} }
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler);
}
protected Dispatcher createDispatcher() {
return new AsyncDispatcher();
}
protected void addIfService(Object object) { protected void addIfService(Object object) {
if (object instanceof Service) { if (object instanceof Service) {
addService((Service) object); addService((Service) object);

View File

@ -32,7 +32,7 @@
* look at {@link RMAppImpl} for its implementation. This interface * look at {@link RMAppImpl} for its implementation. This interface
* exposes methods to access various updates in application status/report. * exposes methods to access various updates in application status/report.
*/ */
public interface RMApp extends EventHandler<RMAppEvent>{ public interface RMApp extends EventHandler<RMAppEvent> {
/** /**
* The application id for this {@link RMApp}. * The application id for this {@link RMApp}.

View File

@ -36,7 +36,7 @@
* {@link YarnConfiguration#RM_AM_MAX_RETRIES}. For specific * {@link YarnConfiguration#RM_AM_MAX_RETRIES}. For specific
* implementation take a look at {@link RMAppAttemptImpl}. * implementation take a look at {@link RMAppAttemptImpl}.
*/ */
public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent>{ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
/** /**
* Get the application attempt id for this {@link RMAppAttempt}. * Get the application attempt id for this {@link RMAppAttempt}.

View File

@ -685,6 +685,8 @@ public FinalTransition(RMAppAttemptState finalAttemptState) {
public void transition(RMAppAttemptImpl appAttempt, public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) { RMAppAttemptEvent event) {
appAttempt.progress = 1.0f;
// Tell the app and the scheduler // Tell the app and the scheduler
super.transition(appAttempt, event); super.transition(appAttempt, event);

View File

@ -207,13 +207,18 @@ synchronized public RMContainer allocate(NodeType type, SchedulerNode node,
.getDispatcher().getEventHandler(), this.rmContext .getDispatcher().getEventHandler(), this.rmContext
.getContainerAllocationExpirer()); .getContainerAllocationExpirer());
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations // Update consumption and track allocations
appSchedulingInfo.allocate(type, node, priority, request, container);
Resources.addTo(currentConsumption, container.getResource());
// Inform the container // Inform the container
rmContainer.handle( rmContainer.handle(
new RMContainerEvent(container.getId(), RMContainerEventType.START)); new RMContainerEvent(container.getId(), RMContainerEventType.START));
Resources.addTo(currentConsumption, container.getResource());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationAttemptId=" LOG.debug("allocate: applicationAttemptId="
+ container.getId().getApplicationAttemptId() + container.getId().getApplicationAttemptId()
@ -223,12 +228,6 @@ synchronized public RMContainer allocate(NodeType type, SchedulerNode node,
RMAuditLogger.logSuccess(getUser(), RMAuditLogger.logSuccess(getUser(),
AuditConstants.ALLOC_CONTAINER, "SchedulerApp", AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
getApplicationId(), container.getId()); getApplicationId(), container.getId());
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
liveContainers.put(container.getId(), rmContainer);
appSchedulingInfo.allocate(type, node, priority, request, container);
return rmContainer; return rmContainer;
} }

View File

@ -19,10 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;

View File

@ -291,7 +291,7 @@ private SchedulerNode getNode(NodeId nodeId) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private synchronized void addApplication(ApplicationAttemptId appAttemptId, private synchronized void addApplication(ApplicationAttemptId appAttemptId,
String queueName, String user) { String user) {
// TODO: Fix store // TODO: Fix store
SchedulerApp schedulerApp = SchedulerApp schedulerApp =
new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
@ -628,7 +628,7 @@ public void handle(SchedulerEvent event) {
{ {
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent addApplication(appAddedEvent.getApplicationAttemptId(), appAddedEvent
.getQueue(), appAddedEvent.getUser()); .getUser());
} }
break; break;
case APP_REMOVED: case APP_REMOVED:

View File

@ -38,6 +38,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
public class MockAM { public class MockAM {
@ -128,7 +129,7 @@ public ResourceRequest createResourceReq(String resource, int memory, int priori
req.setHostName(resource); req.setHostName(resource);
req.setNumContainers(containers); req.setNumContainers(containers);
Priority pri = Records.newRecord(Priority.class); Priority pri = Records.newRecord(Priority.class);
pri.setPriority(1); pri.setPriority(priority);
req.setPriority(pri); req.setPriority(pri);
Resource capability = Records.newRecord(Resource.class); Resource capability = Records.newRecord(Resource.class);
capability.setMemory(memory); capability.setMemory(memory);
@ -139,11 +140,8 @@ public ResourceRequest createResourceReq(String resource, int memory, int priori
public AMResponse allocate( public AMResponse allocate(
List<ResourceRequest> resourceRequest, List<ContainerId> releases) List<ResourceRequest> resourceRequest, List<ContainerId> releases)
throws Exception { throws Exception {
AllocateRequest req = Records.newRecord(AllocateRequest.class); AllocateRequest req = BuilderUtils.newAllocateRequest(attemptId,
req.setResponseId(++responseId); ++responseId, 0F, resourceRequest, releases);
req.setApplicationAttemptId(attemptId);
req.addAllAsks(resourceRequest);
req.addAllReleases(releases);
AllocateResponse resp = amRMProtocol.allocate(req); AllocateResponse resp = amRMProtocol.allocate(req);
return resp.getAMResponse(); return resp.getAMResponse();
} }

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -77,13 +78,14 @@ public void testARRMResponseId() throws Exception {
am.registerAppAttempt(); am.registerAppAttempt();
AllocateRequest allocateRequest = recordFactory.newRecordInstance(AllocateRequest.class); AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(attempt
allocateRequest.setApplicationAttemptId(attempt.getAppAttemptId()); .getAppAttemptId(), 0, 0F, null, null);
AMResponse response = amService.allocate(allocateRequest).getAMResponse(); AMResponse response = amService.allocate(allocateRequest).getAMResponse();
Assert.assertEquals(1, response.getResponseId()); Assert.assertEquals(1, response.getResponseId());
Assert.assertFalse(response.getReboot()); Assert.assertFalse(response.getReboot());
allocateRequest.setResponseId(response.getResponseId()); allocateRequest = BuilderUtils.newAllocateRequest(attempt
.getAppAttemptId(), response.getResponseId(), 0F, null, null);
response = amService.allocate(allocateRequest).getAMResponse(); response = amService.allocate(allocateRequest).getAMResponse();
Assert.assertEquals(2, response.getResponseId()); Assert.assertEquals(2, response.getResponseId());
@ -91,8 +93,9 @@ public void testARRMResponseId() throws Exception {
response = amService.allocate(allocateRequest).getAMResponse(); response = amService.allocate(allocateRequest).getAMResponse();
Assert.assertEquals(2, response.getResponseId()); Assert.assertEquals(2, response.getResponseId());
/** try sending old **/ /** try sending old request again **/
allocateRequest.setResponseId(0); allocateRequest = BuilderUtils.newAllocateRequest(attempt
.getAppAttemptId(), 0, 0F, null, null);
response = amService.allocate(allocateRequest).getAMResponse(); response = amService.allocate(allocateRequest).getAMResponse();
Assert.assertTrue(response.getReboot()); Assert.assertTrue(response.getReboot());
} }

View File

@ -162,6 +162,7 @@ public void setDiagnostics(String diag) {
this.diagnostics = new StringBuilder(diag); this.diagnostics = new StringBuilder(diag);
} }
@Override
public void handle(RMAppEvent event) { public void handle(RMAppEvent event) {
} }

View File

@ -83,6 +83,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -240,12 +241,8 @@ public AMRMProtocol run() {
ask.add(rr); ask.add(rr);
ArrayList<ContainerId> release = new ArrayList<ContainerId>(); ArrayList<ContainerId> release = new ArrayList<ContainerId>();
AllocateRequest allocateRequest = AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
recordFactory.newRecordInstance(AllocateRequest.class); appAttempt.getAppAttemptId(), 0, 0F, ask, release);
allocateRequest.setApplicationAttemptId(appAttempt.getAppAttemptId());
allocateRequest.setResponseId(0);
allocateRequest.addAllAsks(ask);
allocateRequest.addAllReleases(release);
List<Container> allocatedContainers = scheduler.allocate(allocateRequest) List<Container> allocatedContainers = scheduler.allocate(allocateRequest)
.getAMResponse().getAllocatedContainers(); .getAMResponse().getAllocatedContainers();