diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 37c9591995..882f50f22c 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -234,6 +234,9 @@ Release 2.1.0-alpha - Unreleased
MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler.
(Todd Lipcon and Siddharth Seth via sseth)
+ MAPREDUCE-4580. Change MapReduce to use the yarn-client module.
+ (Vinod Kumar Vavilapalli via sseth)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml
index 1e9a509ff5..6b22094bcb 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml
@@ -37,6 +37,10 @@
org.apache.hadoop
hadoop-yarn-common
+
+ org.apache.hadoop
+ hadoop-yarn-client
+
org.apache.hadoop
hadoop-mapreduce-client-core
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
index 62b608aca4..5cf2a1d480 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
@@ -19,9 +19,6 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -41,75 +38,29 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
-import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.hadoop.yarn.client.YarnClientImpl;
-
-// TODO: This should be part of something like yarn-client.
-public class ResourceMgrDelegate {
+public class ResourceMgrDelegate extends YarnClientImpl {
private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
- private final InetSocketAddress rmAddress;
private YarnConfiguration conf;
- ClientRMProtocol applicationsManager;
+ private GetNewApplicationResponse application;
private ApplicationId applicationId;
- private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
/**
* Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}.
* @param conf the configuration object.
*/
public ResourceMgrDelegate(YarnConfiguration conf) {
+ super();
this.conf = conf;
- YarnRPC rpc = YarnRPC.create(this.conf);
- this.rmAddress = getRmAddress(conf);
- LOG.debug("Connecting to ResourceManager at " + rmAddress);
- applicationsManager =
- (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
- rmAddress, this.conf);
- LOG.debug("Connected to ResourceManager at " + rmAddress);
- }
-
- /**
- * Used for injecting applicationsManager, mostly for testing.
- * @param conf the configuration object
- * @param applicationsManager the handle to talk the resource managers
- * {@link ClientRMProtocol}.
- */
- public ResourceMgrDelegate(YarnConfiguration conf,
- ClientRMProtocol applicationsManager) {
- this.conf = conf;
- this.applicationsManager = applicationsManager;
- this.rmAddress = getRmAddress(conf);
- }
-
- private static InetSocketAddress getRmAddress(YarnConfiguration conf) {
- return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_ADDRESS,
- YarnConfiguration.DEFAULT_RM_PORT);
+ init(conf);
+ start();
}
public void cancelDelegationToken(Token arg0)
@@ -117,26 +68,15 @@ public void cancelDelegationToken(Token arg0)
return;
}
-
public TaskTrackerInfo[] getActiveTrackers() throws IOException,
InterruptedException {
- GetClusterNodesRequest request =
- recordFactory.newRecordInstance(GetClusterNodesRequest.class);
- GetClusterNodesResponse response =
- applicationsManager.getClusterNodes(request);
- return TypeConverter.fromYarnNodes(response.getNodeReports());
+ return TypeConverter.fromYarnNodes(super.getNodeReports());
}
-
public JobStatus[] getAllJobs() throws IOException, InterruptedException {
- GetAllApplicationsRequest request =
- recordFactory.newRecordInstance(GetAllApplicationsRequest.class);
- GetAllApplicationsResponse response =
- applicationsManager.getAllApplications(request);
- return TypeConverter.fromYarnApps(response.getApplicationList(), this.conf);
+ return TypeConverter.fromYarnApps(super.getApplicationList(), this.conf);
}
-
public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
InterruptedException {
// TODO: Implement getBlacklistedTrackers
@@ -144,128 +84,56 @@ public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException,
return new TaskTrackerInfo[0];
}
-
public ClusterMetrics getClusterMetrics() throws IOException,
InterruptedException {
- GetClusterMetricsRequest request = recordFactory.newRecordInstance(GetClusterMetricsRequest.class);
- GetClusterMetricsResponse response = applicationsManager.getClusterMetrics(request);
- YarnClusterMetrics metrics = response.getClusterMetrics();
+ YarnClusterMetrics metrics = super.getYarnClusterMetrics();
ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1,
metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1,
metrics.getNumNodeManagers(), 0, 0);
return oldMetrics;
}
-
@SuppressWarnings("rawtypes")
- public Token getDelegationToken(Text renewer)
- throws IOException, InterruptedException {
- /* get the token from RM */
- org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest
- rmDTRequest = recordFactory.newRecordInstance(
- org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest.class);
- rmDTRequest.setRenewer(renewer.toString());
- org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse
- response = applicationsManager.getDelegationToken(rmDTRequest);
- DelegationToken yarnToken = response.getRMDelegationToken();
- return ProtoUtils.convertFromProtoFormat(yarnToken, rmAddress);
+ public Token getDelegationToken(Text renewer) throws IOException,
+ InterruptedException {
+ return ProtoUtils.convertFromProtoFormat(
+ super.getRMDelegationToken(renewer), rmAddress);
}
-
public String getFilesystemName() throws IOException, InterruptedException {
return FileSystem.get(conf).getUri().toString();
}
public JobID getNewJobID() throws IOException, InterruptedException {
- GetNewApplicationRequest request = recordFactory.newRecordInstance(GetNewApplicationRequest.class);
- applicationId = applicationsManager.getNewApplication(request).getApplicationId();
+ this.application = super.getNewApplication();
+ this.applicationId = this.application.getApplicationId();
return TypeConverter.fromYarn(applicationId);
}
- private static final String ROOT = "root";
-
- private GetQueueInfoRequest getQueueInfoRequest(String queueName,
- boolean includeApplications, boolean includeChildQueues, boolean recursive) {
- GetQueueInfoRequest request =
- recordFactory.newRecordInstance(GetQueueInfoRequest.class);
- request.setQueueName(queueName);
- request.setIncludeApplications(includeApplications);
- request.setIncludeChildQueues(includeChildQueues);
- request.setRecursive(recursive);
- return request;
-
- }
-
public QueueInfo getQueue(String queueName) throws IOException,
InterruptedException {
- GetQueueInfoRequest request =
- getQueueInfoRequest(queueName, true, false, false);
- recordFactory.newRecordInstance(GetQueueInfoRequest.class);
return TypeConverter.fromYarn(
- applicationsManager.getQueueInfo(request).getQueueInfo(), this.conf);
+ super.getQueueInfo(queueName), this.conf);
}
-
- private void getChildQueues(org.apache.hadoop.yarn.api.records.QueueInfo parent,
- List queues,
- boolean recursive) {
- List childQueues =
- parent.getChildQueues();
-
- for (org.apache.hadoop.yarn.api.records.QueueInfo child : childQueues) {
- queues.add(child);
- if(recursive) {
- getChildQueues(child, queues, recursive);
- }
- }
- }
-
public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
InterruptedException {
- GetQueueUserAclsInfoRequest request =
- recordFactory.newRecordInstance(GetQueueUserAclsInfoRequest.class);
- List userAcls =
- applicationsManager.getQueueUserAcls(request).getUserAclsInfoList();
- return TypeConverter.fromYarnQueueUserAclsInfo(userAcls);
+ return TypeConverter.fromYarnQueueUserAclsInfo(super
+ .getQueueAclsInfo());
}
-
public QueueInfo[] getQueues() throws IOException, InterruptedException {
- List queues =
- new ArrayList();
-
- org.apache.hadoop.yarn.api.records.QueueInfo rootQueue =
- applicationsManager.getQueueInfo(
- getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo();
- getChildQueues(rootQueue, queues, true);
-
- return TypeConverter.fromYarnQueueInfo(queues, this.conf);
+ return TypeConverter.fromYarnQueueInfo(super.getAllQueues(), this.conf);
}
-
public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
- List queues =
- new ArrayList();
-
- org.apache.hadoop.yarn.api.records.QueueInfo rootQueue =
- applicationsManager.getQueueInfo(
- getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo();
- getChildQueues(rootQueue, queues, false);
-
- return TypeConverter.fromYarnQueueInfo(queues, this.conf);
+ return TypeConverter.fromYarnQueueInfo(super.getRootQueueInfos(), this.conf);
}
public QueueInfo[] getChildQueues(String parent) throws IOException,
InterruptedException {
- List queues =
- new ArrayList();
-
- org.apache.hadoop.yarn.api.records.QueueInfo parentQueue =
- applicationsManager.getQueueInfo(
- getQueueInfoRequest(parent, false, true, false)).getQueueInfo();
- getChildQueues(parentQueue, queues, true);
-
- return TypeConverter.fromYarnQueueInfo(queues, this.conf);
+ return TypeConverter.fromYarnQueueInfo(super.getChildQueueInfos(parent),
+ this.conf);
}
public String getStagingAreaDir() throws IOException, InterruptedException {
@@ -307,40 +175,6 @@ public long renewDelegationToken(Token arg0)
return 0;
}
-
- public ApplicationId submitApplication(
- ApplicationSubmissionContext appContext)
- throws IOException {
- appContext.setApplicationId(applicationId);
- SubmitApplicationRequest request =
- recordFactory.newRecordInstance(SubmitApplicationRequest.class);
- request.setApplicationSubmissionContext(appContext);
- applicationsManager.submitApplication(request);
- LOG.info("Submitted application " + applicationId + " to ResourceManager" +
- " at " + rmAddress);
- return applicationId;
- }
-
- public void killApplication(ApplicationId applicationId) throws IOException {
- KillApplicationRequest request =
- recordFactory.newRecordInstance(KillApplicationRequest.class);
- request.setApplicationId(applicationId);
- applicationsManager.forceKillApplication(request);
- LOG.info("Killing application " + applicationId);
- }
-
-
- public ApplicationReport getApplicationReport(ApplicationId appId)
- throws YarnRemoteException {
- GetApplicationReportRequest request = recordFactory
- .newRecordInstance(GetApplicationReportRequest.class);
- request.setApplicationId(appId);
- GetApplicationReportResponse response = applicationsManager
- .getApplicationReport(request);
- ApplicationReport applicationReport = response.getApplicationReport();
- return applicationReport;
- }
-
public ApplicationId getApplicationId() {
return applicationId;
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index 74ae6446cd..f3271768ae 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -89,7 +89,7 @@
/**
* This class enables the current JobClient (0.22 hadoop) to run on YARN.
*/
-@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
+@SuppressWarnings({ "rawtypes", "unchecked" })
public class YARNRunner implements ClientProtocol {
private static final Log LOG = LogFactory.getLog(YARNRunner.class);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java
index ad1ebc9d80..cd325a1d60 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java
@@ -50,7 +50,7 @@ public class TestResourceMgrDelegate {
*/
@Test
public void testGetRootQueues() throws IOException, InterruptedException {
- ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class);
+ final ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class);
GetQueueInfoResponse response = Mockito.mock(GetQueueInfoResponse.class);
org.apache.hadoop.yarn.api.records.QueueInfo queueInfo =
Mockito.mock(org.apache.hadoop.yarn.api.records.QueueInfo.class);
@@ -59,12 +59,17 @@ public void testGetRootQueues() throws IOException, InterruptedException {
GetQueueInfoRequest.class))).thenReturn(response);
ResourceMgrDelegate delegate = new ResourceMgrDelegate(
- new YarnConfiguration(), applicationsManager);
+ new YarnConfiguration()) {
+ @Override
+ public synchronized void start() {
+ this.rmClient = applicationsManager;
+ }
+ };
delegate.getRootQueues();
ArgumentCaptor argument =
ArgumentCaptor.forClass(GetQueueInfoRequest.class);
- Mockito.verify(delegate.applicationsManager).getQueueInfo(
+ Mockito.verify(applicationsManager).getQueueInfo(
argument.capture());
Assert.assertTrue("Children of root queue not requested",
@@ -75,7 +80,7 @@ public void testGetRootQueues() throws IOException, InterruptedException {
@Test
public void tesAllJobs() throws Exception {
- ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class);
+ final ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class);
GetAllApplicationsResponse allApplicationsResponse = Records
.newRecord(GetAllApplicationsResponse.class);
List applications = new ArrayList();
@@ -93,7 +98,12 @@ public void tesAllJobs() throws Exception {
.any(GetAllApplicationsRequest.class))).thenReturn(
allApplicationsResponse);
ResourceMgrDelegate resourceMgrDelegate = new ResourceMgrDelegate(
- new YarnConfiguration(), applicationsManager);
+ new YarnConfiguration()) {
+ @Override
+ public synchronized void start() {
+ this.rmClient = applicationsManager;
+ }
+ };
JobStatus[] allJobs = resourceMgrDelegate.getAllJobs();
Assert.assertEquals(State.FAILED, allJobs[0].getState());
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java
index 7052c76902..1bbffb8fde 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java
@@ -18,9 +18,10 @@
package org.apache.hadoop.mapreduce;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.any;
import static org.mockito.Mockito.when;
+
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -104,14 +105,20 @@ public void testClusterGetDelegationToken() throws Exception {
rmDTToken.setPassword(ByteBuffer.wrap("testcluster".getBytes()));
rmDTToken.setService("0.0.0.0:8032");
getDTResponse.setRMDelegationToken(rmDTToken);
- ClientRMProtocol cRMProtocol = mock(ClientRMProtocol.class);
+ final ClientRMProtocol cRMProtocol = mock(ClientRMProtocol.class);
when(cRMProtocol.getDelegationToken(any(
GetDelegationTokenRequest.class))).thenReturn(getDTResponse);
ResourceMgrDelegate rmgrDelegate = new ResourceMgrDelegate(
- new YarnConfiguration(conf), cRMProtocol);
+ new YarnConfiguration(conf)) {
+ @Override
+ public synchronized void start() {
+ this.rmClient = cRMProtocol;
+ }
+ };
yrunner.setResourceMgrDelegate(rmgrDelegate);
Token t = cluster.getDelegationToken(new Text(" "));
- assertTrue("Testclusterkind".equals(t.getKind().toString()));
+ assertTrue("Token kind is instead " + t.getKind().toString(),
+ "Testclusterkind".equals(t.getKind().toString()));
} finally {
if (cluster != null) {
cluster.close();
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java
index bacf164863..3bbd1324bd 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java
@@ -177,8 +177,13 @@ public void testJobSubmissionFailure() throws Exception {
@Test
public void testResourceMgrDelegate() throws Exception {
/* we not want a mock of resourcemgr deleagte */
- ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class);
- ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf, clientRMProtocol);
+ final ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class);
+ ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf) {
+ @Override
+ public synchronized void start() {
+ this.rmClient = clientRMProtocol;
+ }
+ };
/* make sure kill calls finish application master */
when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class)))
.thenReturn(null);