From beec374542da88416c4b3cb339c3680e9a844463 Mon Sep 17 00:00:00 2001 From: Mahadev Konar Date: Thu, 15 Dec 2011 08:58:51 +0000 Subject: [PATCH] MAPREDUCE-3251. Network ACLs can prevent some clients to talk to MR ApplicationMaster (Anupam Seth via mahadev) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1214662 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapred/ClientServiceDelegate.java | 103 +++++++++++++----- .../mapred/TestClientServiceDelegate.java | 73 ++++++++++++- .../hadoop/yarn/conf/YarnConfiguration.java | 5 + .../src/main/resources/yarn-default.xml | 6 + 5 files changed, 162 insertions(+), 28 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e68a62cc4e..d7cb4f657a 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -95,6 +95,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-2863. Support web services for YARN and MR components. (Thomas Graves via vinodkv) + MAPREDUCE-3251. Network ACLs can prevent some clients to talk to MR ApplicationMaster + (Anupam Seth via mahadev) + IMPROVEMENTS MAPREDUCE-3297. Moved log related components into yarn-common so that diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java index 71ea84bb8c..3b6fc9f618 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ClientServiceDelegate.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -156,30 +157,37 @@ private MRClientProtocol getProxy() throws YarnRemoteException { application = rm.getApplicationReport(appId); continue; } - UserGroupInformation newUgi = UserGroupInformation.createRemoteUser( - UserGroupInformation.getCurrentUser().getUserName()); - serviceAddr = application.getHost() + ":" + application.getRpcPort(); - if (UserGroupInformation.isSecurityEnabled()) { - String clientTokenEncoded = application.getClientToken(); - Token clientToken = - new Token(); - clientToken.decodeFromUrlString(clientTokenEncoded); - // RPC layer client expects ip:port as service for tokens - InetSocketAddress addr = NetUtils.createSocketAddr(application - .getHost(), application.getRpcPort()); - clientToken.setService(new Text(addr.getAddress().getHostAddress() - + ":" + addr.getPort())); - newUgi.addToken(clientToken); - } - LOG.info("The url to track the job: " + application.getTrackingUrl()); - LOG.debug("Connecting to " + serviceAddr); - final String tempStr = serviceAddr; - realProxy = newUgi.doAs(new PrivilegedExceptionAction() { - @Override - public MRClientProtocol run() throws IOException { - return instantiateAMProxy(tempStr); + if(!conf.getBoolean(YarnConfiguration.RM_AM_NETWORK_ACL_CLOSED, false)) { + UserGroupInformation newUgi = UserGroupInformation.createRemoteUser( + UserGroupInformation.getCurrentUser().getUserName()); + serviceAddr = application.getHost() + ":" + application.getRpcPort(); + if (UserGroupInformation.isSecurityEnabled()) { + String clientTokenEncoded = application.getClientToken(); + Token clientToken = + new Token(); + clientToken.decodeFromUrlString(clientTokenEncoded); + // RPC layer client expects ip:port as service for tokens + InetSocketAddress addr = NetUtils.createSocketAddr(application + .getHost(), application.getRpcPort()); + clientToken.setService(new Text(addr.getAddress().getHostAddress() + + ":" + addr.getPort())); + newUgi.addToken(clientToken); } - }); + LOG.info("The url to track the job: " + application.getTrackingUrl()); + LOG.debug("Connecting to " + serviceAddr); + final String tempStr = serviceAddr; + realProxy = newUgi.doAs(new PrivilegedExceptionAction() { + @Override + public MRClientProtocol run() throws IOException { + return instantiateAMProxy(tempStr); + } + }); + } else { + logApplicationReportInfo(application); + LOG.info("Network ACL closed to AM for job " + jobId + + ". Redirecting to job history server."); + return checkAndGetHSProxy(null, JobState.RUNNING); + } return realProxy; } catch (IOException e) { //possibly the AM has crashed @@ -240,10 +248,55 @@ public MRClientProtocol run() throws IOException { return realProxy; } + private void logApplicationReportInfo(ApplicationReport application) { + if(application == null) { + return; + } + LOG.info("AppId: " + application.getApplicationId() + + " # reserved containers: " + + application.getApplicationResourceUsageReport().getNumReservedContainers() + + " # used containers: " + + application.getApplicationResourceUsageReport().getNumUsedContainers() + + " Needed resources (memory): " + + application.getApplicationResourceUsageReport().getNeededResources().getMemory() + + " Reserved resources (memory): " + + application.getApplicationResourceUsageReport().getReservedResources().getMemory() + + " Used resources (memory): " + + application.getApplicationResourceUsageReport().getUsedResources().getMemory() + + " Diagnostics: " + + application.getDiagnostics() + + " Start time: " + + application.getStartTime() + + " Finish time: " + + application.getFinishTime() + + " Host: " + + application.getHost() + + " Name: " + + application.getName() + + " Orig. tracking url: " + + application.getOriginalTrackingUrl() + + " Queue: " + + application.getQueue() + + " RPC port: " + + application.getRpcPort() + + " Tracking url: " + + application.getTrackingUrl() + + " User: " + + application.getUser() + + " Client token: " + + application.getClientToken() + + " Final appl. status: " + + application.getFinalApplicationStatus() + + " Yarn appl. state: " + + application.getYarnApplicationState() + ); + } + private MRClientProtocol checkAndGetHSProxy( ApplicationReport applicationReport, JobState state) { if (null == historyServerProxy) { - LOG.warn("Job History Server is not configured."); + LOG.warn("Job History Server is not configured or " + + "job information not yet available on History Server."); return getNotRunningJob(applicationReport, state); } return historyServerProxy; @@ -452,4 +505,4 @@ public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID) throw new IOException("Cannot get log path for a in-progress job"); } } -} \ No newline at end of file +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java index b17cb427d3..7bd94a7268 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestClientServiceDelegate.java @@ -22,6 +22,8 @@ import static org.mockito.Mockito.*; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import junit.framework.Assert; @@ -31,8 +33,13 @@ import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse; +import org.apache.hadoop.mapreduce.v2.api.records.Counter; +import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup; +import org.apache.hadoop.mapreduce.v2.api.records.Counters; import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; @@ -45,15 +52,30 @@ import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.Records; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; /** * Tests for ClientServiceDelegate.java */ +@RunWith(value = Parameterized.class) public class TestClientServiceDelegate { private JobID oldJobId = JobID.forName("job_1315895242400_2"); private org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter .toYarn(oldJobId); + private boolean isAMReachableFromClient; + + public TestClientServiceDelegate(boolean isAMReachableFromClient) { + this.isAMReachableFromClient = isAMReachableFromClient; + } + + @Parameters + public static Collection data() { + Object[][] data = new Object[][] { { true }, { false } }; + return Arrays.asList(data); + } @Test public void testUnknownAppInRM() throws Exception { @@ -150,9 +172,30 @@ public void testJobReportFromHistoryServer() throws Exception { Assert.assertEquals(1.0f, jobStatus.getMapProgress()); Assert.assertEquals(1.0f, jobStatus.getReduceProgress()); } + + @Test + public void testCountersFromHistoryServer() throws Exception { + MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); + when(historyServerProxy.getCounters(getCountersRequest())).thenReturn( + getCountersResponseFromHistoryServer()); + ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class); + when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId())) + .thenReturn(null); + ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate( + historyServerProxy, rm); + + Counters counters = TypeConverter.toYarn(clientServiceDelegate.getJobCounters(oldJobId)); + Assert.assertNotNull(counters); + Assert.assertEquals(1001, counters.getCounterGroup("dummyCounters").getCounter("dummyCounter").getValue()); + } @Test public void testReconnectOnAMRestart() throws IOException { + //test not applicable when AM not reachable + //as instantiateAMProxy is not called at all + if(!isAMReachableFromClient) { + return; + } MRClientProtocol historyServerProxy = mock(MRClientProtocol.class); @@ -186,7 +229,7 @@ public void testReconnectOnAMRestart() throws IOException { MRClientProtocol secondGenAMProxy = mock(MRClientProtocol.class); when(secondGenAMProxy.getJobReport(any(GetJobReportRequest.class))) .thenReturn(jobReportResponse2); - + ClientServiceDelegate clientServiceDelegate = spy(getClientServiceDelegate( historyServerProxy, rmDelegate)); // First time, connection should be to AM1, then to AM2. Further requests @@ -210,13 +253,13 @@ public void testReconnectOnAMRestart() throws IOException { verify(clientServiceDelegate, times(2)).instantiateAMProxy( any(String.class)); } - + private GetJobReportRequest getJobReportRequest() { GetJobReportRequest request = Records.newRecord(GetJobReportRequest.class); request.setJobId(jobId); return request; } - + private GetJobReportResponse getJobReportResponse() { GetJobReportResponse jobReportResponse = Records .newRecord(GetJobReportResponse.class); @@ -227,6 +270,12 @@ private GetJobReportResponse getJobReportResponse() { return jobReportResponse; } + private GetCountersRequest getCountersRequest() { + GetCountersRequest request = Records.newRecord(GetCountersRequest.class); + request.setJobId(jobId); + return request; + } + private ApplicationReport getFinishedApplicationReport() { return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId( 1234, 5), "user", "queue", "appname", "host", 124, null, @@ -251,6 +300,7 @@ private ClientServiceDelegate getClientServiceDelegate( MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) { Configuration conf = new YarnConfiguration(); conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); + conf.setBoolean(YarnConfiguration.RM_AM_NETWORK_ACL_CLOSED, !isAMReachableFromClient); ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate( conf, rm, oldJobId, historyServerProxy); return clientServiceDelegate; @@ -269,4 +319,21 @@ private GetJobReportResponse getJobReportResponseFromHistoryServer() { jobReportResponse.setJobReport(jobReport); return jobReportResponse; } + + private GetCountersResponse getCountersResponseFromHistoryServer() { + GetCountersResponse countersResponse = Records + .newRecord(GetCountersResponse.class); + Counter counter = Records.newRecord(Counter.class); + CounterGroup counterGroup = Records.newRecord(CounterGroup.class); + Counters counters = Records.newRecord(Counters.class); + counter.setDisplayName("dummyCounter"); + counter.setName("dummyCounter"); + counter.setValue(1001); + counterGroup.setName("dummyCounters"); + counterGroup.setDisplayName("dummyCounters"); + counterGroup.setCounter("dummyCounter", counter); + counters.setCounterGroup("dummyCounters", counterGroup); + countersResponse.setCounters(counters); + return countersResponse; + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1133d78816..5e73d753b5 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -145,6 +145,11 @@ public class YarnConfiguration extends Configuration { /** ACL used in case none is found. Allows nothing. */ public static final String DEFAULT_YARN_APP_ACL = " "; + /** RM-AM ACL disabled. **/ + public static final String RM_AM_NETWORK_ACL_CLOSED = + RM_PREFIX + "am.acl.disabled"; + public static final boolean DEFAULT_RM_AM_NETWORK_ACL_CLOSED = false; + /** The address of the RM admin interface.*/ public static final String RM_ADMIN_ADDRESS = RM_PREFIX + "admin.address"; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml index 25e25cd0a7..f15250a738 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml @@ -117,6 +117,12 @@ * + + Network ACL to AM closed. + yarn.resourcemanager.am.acl.disabled + false + + The address of the RM admin interface. yarn.resourcemanager.admin.address