From 24e47ebc18a62f6de351ec1ab8b9816999cc3267 Mon Sep 17 00:00:00 2001 From: Siddharth Seth Date: Tue, 28 Aug 2012 00:40:02 +0000 Subject: [PATCH] MAPREDUCE-4580. Change MapReduce to use the yarn-client module. (Contributed by Vinod Kumar Vavilapalli) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1377922 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop-mapreduce-client-common/pom.xml | 4 + .../hadoop/mapred/ResourceMgrDelegate.java | 212 ++---------------- .../org/apache/hadoop/mapred/YARNRunner.java | 2 +- .../mapred/TestResourceMgrDelegate.java | 20 +- .../TestYarnClientProtocolProvider.java | 15 +- .../hadoop/mapreduce/v2/TestYARNRunner.java | 9 +- 7 files changed, 64 insertions(+), 201 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 37c9591995..882f50f22c 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -234,6 +234,9 @@ Release 2.1.0-alpha - Unreleased MAPREDUCE-3289. Make use of fadvise in the NM's shuffle handler. (Todd Lipcon and Siddharth Seth via sseth) + MAPREDUCE-4580. Change MapReduce to use the yarn-client module. + (Vinod Kumar Vavilapalli via sseth) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml index 1e9a509ff5..6b22094bcb 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml @@ -37,6 +37,10 @@ org.apache.hadoop hadoop-yarn-common + + org.apache.hadoop + hadoop-yarn-client + org.apache.hadoop hadoop-mapreduce-client-core diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java index 62b608aca4..5cf2a1d480 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java @@ -19,9 +19,6 @@ package org.apache.hadoop.mapred; import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -41,75 +38,29 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; -import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; -import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; -import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRemoteException; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.util.ProtoUtils; +import org.hadoop.yarn.client.YarnClientImpl; - -// TODO: This should be part of something like yarn-client. -public class ResourceMgrDelegate { +public class ResourceMgrDelegate extends YarnClientImpl { private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class); - private final InetSocketAddress rmAddress; private YarnConfiguration conf; - ClientRMProtocol applicationsManager; + private GetNewApplicationResponse application; private ApplicationId applicationId; - private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); /** * Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}. * @param conf the configuration object. */ public ResourceMgrDelegate(YarnConfiguration conf) { + super(); this.conf = conf; - YarnRPC rpc = YarnRPC.create(this.conf); - this.rmAddress = getRmAddress(conf); - LOG.debug("Connecting to ResourceManager at " + rmAddress); - applicationsManager = - (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, - rmAddress, this.conf); - LOG.debug("Connected to ResourceManager at " + rmAddress); - } - - /** - * Used for injecting applicationsManager, mostly for testing. - * @param conf the configuration object - * @param applicationsManager the handle to talk the resource managers - * {@link ClientRMProtocol}. - */ - public ResourceMgrDelegate(YarnConfiguration conf, - ClientRMProtocol applicationsManager) { - this.conf = conf; - this.applicationsManager = applicationsManager; - this.rmAddress = getRmAddress(conf); - } - - private static InetSocketAddress getRmAddress(YarnConfiguration conf) { - return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_PORT); + init(conf); + start(); } public void cancelDelegationToken(Token arg0) @@ -117,26 +68,15 @@ public void cancelDelegationToken(Token arg0) return; } - public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException { - GetClusterNodesRequest request = - recordFactory.newRecordInstance(GetClusterNodesRequest.class); - GetClusterNodesResponse response = - applicationsManager.getClusterNodes(request); - return TypeConverter.fromYarnNodes(response.getNodeReports()); + return TypeConverter.fromYarnNodes(super.getNodeReports()); } - public JobStatus[] getAllJobs() throws IOException, InterruptedException { - GetAllApplicationsRequest request = - recordFactory.newRecordInstance(GetAllApplicationsRequest.class); - GetAllApplicationsResponse response = - applicationsManager.getAllApplications(request); - return TypeConverter.fromYarnApps(response.getApplicationList(), this.conf); + return TypeConverter.fromYarnApps(super.getApplicationList(), this.conf); } - public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException { // TODO: Implement getBlacklistedTrackers @@ -144,128 +84,56 @@ public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, return new TaskTrackerInfo[0]; } - public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException { - GetClusterMetricsRequest request = recordFactory.newRecordInstance(GetClusterMetricsRequest.class); - GetClusterMetricsResponse response = applicationsManager.getClusterMetrics(request); - YarnClusterMetrics metrics = response.getClusterMetrics(); + YarnClusterMetrics metrics = super.getYarnClusterMetrics(); ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1, metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1, metrics.getNumNodeManagers(), 0, 0); return oldMetrics; } - @SuppressWarnings("rawtypes") - public Token getDelegationToken(Text renewer) - throws IOException, InterruptedException { - /* get the token from RM */ - org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest - rmDTRequest = recordFactory.newRecordInstance( - org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest.class); - rmDTRequest.setRenewer(renewer.toString()); - org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse - response = applicationsManager.getDelegationToken(rmDTRequest); - DelegationToken yarnToken = response.getRMDelegationToken(); - return ProtoUtils.convertFromProtoFormat(yarnToken, rmAddress); + public Token getDelegationToken(Text renewer) throws IOException, + InterruptedException { + return ProtoUtils.convertFromProtoFormat( + super.getRMDelegationToken(renewer), rmAddress); } - public String getFilesystemName() throws IOException, InterruptedException { return FileSystem.get(conf).getUri().toString(); } public JobID getNewJobID() throws IOException, InterruptedException { - GetNewApplicationRequest request = recordFactory.newRecordInstance(GetNewApplicationRequest.class); - applicationId = applicationsManager.getNewApplication(request).getApplicationId(); + this.application = super.getNewApplication(); + this.applicationId = this.application.getApplicationId(); return TypeConverter.fromYarn(applicationId); } - private static final String ROOT = "root"; - - private GetQueueInfoRequest getQueueInfoRequest(String queueName, - boolean includeApplications, boolean includeChildQueues, boolean recursive) { - GetQueueInfoRequest request = - recordFactory.newRecordInstance(GetQueueInfoRequest.class); - request.setQueueName(queueName); - request.setIncludeApplications(includeApplications); - request.setIncludeChildQueues(includeChildQueues); - request.setRecursive(recursive); - return request; - - } - public QueueInfo getQueue(String queueName) throws IOException, InterruptedException { - GetQueueInfoRequest request = - getQueueInfoRequest(queueName, true, false, false); - recordFactory.newRecordInstance(GetQueueInfoRequest.class); return TypeConverter.fromYarn( - applicationsManager.getQueueInfo(request).getQueueInfo(), this.conf); + super.getQueueInfo(queueName), this.conf); } - - private void getChildQueues(org.apache.hadoop.yarn.api.records.QueueInfo parent, - List queues, - boolean recursive) { - List childQueues = - parent.getChildQueues(); - - for (org.apache.hadoop.yarn.api.records.QueueInfo child : childQueues) { - queues.add(child); - if(recursive) { - getChildQueues(child, queues, recursive); - } - } - } - public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException { - GetQueueUserAclsInfoRequest request = - recordFactory.newRecordInstance(GetQueueUserAclsInfoRequest.class); - List userAcls = - applicationsManager.getQueueUserAcls(request).getUserAclsInfoList(); - return TypeConverter.fromYarnQueueUserAclsInfo(userAcls); + return TypeConverter.fromYarnQueueUserAclsInfo(super + .getQueueAclsInfo()); } - public QueueInfo[] getQueues() throws IOException, InterruptedException { - List queues = - new ArrayList(); - - org.apache.hadoop.yarn.api.records.QueueInfo rootQueue = - applicationsManager.getQueueInfo( - getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo(); - getChildQueues(rootQueue, queues, true); - - return TypeConverter.fromYarnQueueInfo(queues, this.conf); + return TypeConverter.fromYarnQueueInfo(super.getAllQueues(), this.conf); } - public QueueInfo[] getRootQueues() throws IOException, InterruptedException { - List queues = - new ArrayList(); - - org.apache.hadoop.yarn.api.records.QueueInfo rootQueue = - applicationsManager.getQueueInfo( - getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo(); - getChildQueues(rootQueue, queues, false); - - return TypeConverter.fromYarnQueueInfo(queues, this.conf); + return TypeConverter.fromYarnQueueInfo(super.getRootQueueInfos(), this.conf); } public QueueInfo[] getChildQueues(String parent) throws IOException, InterruptedException { - List queues = - new ArrayList(); - - org.apache.hadoop.yarn.api.records.QueueInfo parentQueue = - applicationsManager.getQueueInfo( - getQueueInfoRequest(parent, false, true, false)).getQueueInfo(); - getChildQueues(parentQueue, queues, true); - - return TypeConverter.fromYarnQueueInfo(queues, this.conf); + return TypeConverter.fromYarnQueueInfo(super.getChildQueueInfos(parent), + this.conf); } public String getStagingAreaDir() throws IOException, InterruptedException { @@ -307,40 +175,6 @@ public long renewDelegationToken(Token arg0) return 0; } - - public ApplicationId submitApplication( - ApplicationSubmissionContext appContext) - throws IOException { - appContext.setApplicationId(applicationId); - SubmitApplicationRequest request = - recordFactory.newRecordInstance(SubmitApplicationRequest.class); - request.setApplicationSubmissionContext(appContext); - applicationsManager.submitApplication(request); - LOG.info("Submitted application " + applicationId + " to ResourceManager" + - " at " + rmAddress); - return applicationId; - } - - public void killApplication(ApplicationId applicationId) throws IOException { - KillApplicationRequest request = - recordFactory.newRecordInstance(KillApplicationRequest.class); - request.setApplicationId(applicationId); - applicationsManager.forceKillApplication(request); - LOG.info("Killing application " + applicationId); - } - - - public ApplicationReport getApplicationReport(ApplicationId appId) - throws YarnRemoteException { - GetApplicationReportRequest request = recordFactory - .newRecordInstance(GetApplicationReportRequest.class); - request.setApplicationId(appId); - GetApplicationReportResponse response = applicationsManager - .getApplicationReport(request); - ApplicationReport applicationReport = response.getApplicationReport(); - return applicationReport; - } - public ApplicationId getApplicationId() { return applicationId; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 74ae6446cd..f3271768ae 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -89,7 +89,7 @@ /** * This class enables the current JobClient (0.22 hadoop) to run on YARN. */ -@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) +@SuppressWarnings({ "rawtypes", "unchecked" }) public class YARNRunner implements ClientProtocol { private static final Log LOG = LogFactory.getLog(YARNRunner.class); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java index ad1ebc9d80..cd325a1d60 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java @@ -50,7 +50,7 @@ public class TestResourceMgrDelegate { */ @Test public void testGetRootQueues() throws IOException, InterruptedException { - ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class); + final ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class); GetQueueInfoResponse response = Mockito.mock(GetQueueInfoResponse.class); org.apache.hadoop.yarn.api.records.QueueInfo queueInfo = Mockito.mock(org.apache.hadoop.yarn.api.records.QueueInfo.class); @@ -59,12 +59,17 @@ public void testGetRootQueues() throws IOException, InterruptedException { GetQueueInfoRequest.class))).thenReturn(response); ResourceMgrDelegate delegate = new ResourceMgrDelegate( - new YarnConfiguration(), applicationsManager); + new YarnConfiguration()) { + @Override + public synchronized void start() { + this.rmClient = applicationsManager; + } + }; delegate.getRootQueues(); ArgumentCaptor argument = ArgumentCaptor.forClass(GetQueueInfoRequest.class); - Mockito.verify(delegate.applicationsManager).getQueueInfo( + Mockito.verify(applicationsManager).getQueueInfo( argument.capture()); Assert.assertTrue("Children of root queue not requested", @@ -75,7 +80,7 @@ public void testGetRootQueues() throws IOException, InterruptedException { @Test public void tesAllJobs() throws Exception { - ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class); + final ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class); GetAllApplicationsResponse allApplicationsResponse = Records .newRecord(GetAllApplicationsResponse.class); List applications = new ArrayList(); @@ -93,7 +98,12 @@ public void tesAllJobs() throws Exception { .any(GetAllApplicationsRequest.class))).thenReturn( allApplicationsResponse); ResourceMgrDelegate resourceMgrDelegate = new ResourceMgrDelegate( - new YarnConfiguration(), applicationsManager); + new YarnConfiguration()) { + @Override + public synchronized void start() { + this.rmClient = applicationsManager; + } + }; JobStatus[] allJobs = resourceMgrDelegate.getAllJobs(); Assert.assertEquals(State.FAILED, allJobs[0].getState()); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java index 7052c76902..1bbffb8fde 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java @@ -18,9 +18,10 @@ package org.apache.hadoop.mapreduce; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.when; + import java.io.IOException; import java.nio.ByteBuffer; @@ -104,14 +105,20 @@ public void testClusterGetDelegationToken() throws Exception { rmDTToken.setPassword(ByteBuffer.wrap("testcluster".getBytes())); rmDTToken.setService("0.0.0.0:8032"); getDTResponse.setRMDelegationToken(rmDTToken); - ClientRMProtocol cRMProtocol = mock(ClientRMProtocol.class); + final ClientRMProtocol cRMProtocol = mock(ClientRMProtocol.class); when(cRMProtocol.getDelegationToken(any( GetDelegationTokenRequest.class))).thenReturn(getDTResponse); ResourceMgrDelegate rmgrDelegate = new ResourceMgrDelegate( - new YarnConfiguration(conf), cRMProtocol); + new YarnConfiguration(conf)) { + @Override + public synchronized void start() { + this.rmClient = cRMProtocol; + } + }; yrunner.setResourceMgrDelegate(rmgrDelegate); Token t = cluster.getDelegationToken(new Text(" ")); - assertTrue("Testclusterkind".equals(t.getKind().toString())); + assertTrue("Token kind is instead " + t.getKind().toString(), + "Testclusterkind".equals(t.getKind().toString())); } finally { if (cluster != null) { cluster.close(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java index bacf164863..3bbd1324bd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestYARNRunner.java @@ -177,8 +177,13 @@ public void testJobSubmissionFailure() throws Exception { @Test public void testResourceMgrDelegate() throws Exception { /* we not want a mock of resourcemgr deleagte */ - ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class); - ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf, clientRMProtocol); + final ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class); + ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf) { + @Override + public synchronized void start() { + this.rmClient = clientRMProtocol; + } + }; /* make sure kill calls finish application master */ when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class))) .thenReturn(null);