MAPREDUCE-3251. Network ACLs can prevent some clients to talk to MR ApplicationMaster (Anupam Seth via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1214662 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mahadev Konar 2011-12-15 08:58:51 +00:00
parent 14ba2e65c9
commit beec374542
5 changed files with 162 additions and 28 deletions

View File

@ -95,6 +95,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-2863. Support web services for YARN and MR components. (Thomas
Graves via vinodkv)
MAPREDUCE-3251. Network ACLs can prevent some clients to talk to MR ApplicationMaster
(Anupam Seth via mahadev)
IMPROVEMENTS
MAPREDUCE-3297. Moved log related components into yarn-common so that

View File

@ -68,6 +68,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -156,6 +157,7 @@ private MRClientProtocol getProxy() throws YarnRemoteException {
application = rm.getApplicationReport(appId);
continue;
}
if(!conf.getBoolean(YarnConfiguration.RM_AM_NETWORK_ACL_CLOSED, false)) {
UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
UserGroupInformation.getCurrentUser().getUserName());
serviceAddr = application.getHost() + ":" + application.getRpcPort();
@ -180,6 +182,12 @@ public MRClientProtocol run() throws IOException {
return instantiateAMProxy(tempStr);
}
});
} else {
logApplicationReportInfo(application);
LOG.info("Network ACL closed to AM for job " + jobId
+ ". Redirecting to job history server.");
return checkAndGetHSProxy(null, JobState.RUNNING);
}
return realProxy;
} catch (IOException e) {
//possibly the AM has crashed
@ -240,10 +248,55 @@ public MRClientProtocol run() throws IOException {
return realProxy;
}
private void logApplicationReportInfo(ApplicationReport application) {
if(application == null) {
return;
}
LOG.info("AppId: " + application.getApplicationId()
+ " # reserved containers: "
+ application.getApplicationResourceUsageReport().getNumReservedContainers()
+ " # used containers: "
+ application.getApplicationResourceUsageReport().getNumUsedContainers()
+ " Needed resources (memory): "
+ application.getApplicationResourceUsageReport().getNeededResources().getMemory()
+ " Reserved resources (memory): "
+ application.getApplicationResourceUsageReport().getReservedResources().getMemory()
+ " Used resources (memory): "
+ application.getApplicationResourceUsageReport().getUsedResources().getMemory()
+ " Diagnostics: "
+ application.getDiagnostics()
+ " Start time: "
+ application.getStartTime()
+ " Finish time: "
+ application.getFinishTime()
+ " Host: "
+ application.getHost()
+ " Name: "
+ application.getName()
+ " Orig. tracking url: "
+ application.getOriginalTrackingUrl()
+ " Queue: "
+ application.getQueue()
+ " RPC port: "
+ application.getRpcPort()
+ " Tracking url: "
+ application.getTrackingUrl()
+ " User: "
+ application.getUser()
+ " Client token: "
+ application.getClientToken()
+ " Final appl. status: "
+ application.getFinalApplicationStatus()
+ " Yarn appl. state: "
+ application.getYarnApplicationState()
);
}
private MRClientProtocol checkAndGetHSProxy(
ApplicationReport applicationReport, JobState state) {
if (null == historyServerProxy) {
LOG.warn("Job History Server is not configured.");
LOG.warn("Job History Server is not configured or " +
"job information not yet available on History Server.");
return getNotRunningJob(applicationReport, state);
}
return historyServerProxy;

View File

@ -22,6 +22,8 @@
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import junit.framework.Assert;
@ -31,8 +33,13 @@
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
import org.apache.hadoop.mapreduce.v2.api.records.Counter;
import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@ -45,15 +52,30 @@
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/**
* Tests for ClientServiceDelegate.java
*/
@RunWith(value = Parameterized.class)
public class TestClientServiceDelegate {
private JobID oldJobId = JobID.forName("job_1315895242400_2");
private org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter
.toYarn(oldJobId);
private boolean isAMReachableFromClient;
public TestClientServiceDelegate(boolean isAMReachableFromClient) {
this.isAMReachableFromClient = isAMReachableFromClient;
}
@Parameters
public static Collection<Object[]> data() {
Object[][] data = new Object[][] { { true }, { false } };
return Arrays.asList(data);
}
@Test
public void testUnknownAppInRM() throws Exception {
@ -151,8 +173,29 @@ public void testJobReportFromHistoryServer() throws Exception {
Assert.assertEquals(1.0f, jobStatus.getReduceProgress());
}
@Test
public void testCountersFromHistoryServer() throws Exception {
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
when(historyServerProxy.getCounters(getCountersRequest())).thenReturn(
getCountersResponseFromHistoryServer());
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
.thenReturn(null);
ClientServiceDelegate clientServiceDelegate = getClientServiceDelegate(
historyServerProxy, rm);
Counters counters = TypeConverter.toYarn(clientServiceDelegate.getJobCounters(oldJobId));
Assert.assertNotNull(counters);
Assert.assertEquals(1001, counters.getCounterGroup("dummyCounters").getCounter("dummyCounter").getValue());
}
@Test
public void testReconnectOnAMRestart() throws IOException {
//test not applicable when AM not reachable
//as instantiateAMProxy is not called at all
if(!isAMReachableFromClient) {
return;
}
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
@ -227,6 +270,12 @@ private GetJobReportResponse getJobReportResponse() {
return jobReportResponse;
}
private GetCountersRequest getCountersRequest() {
GetCountersRequest request = Records.newRecord(GetCountersRequest.class);
request.setJobId(jobId);
return request;
}
private ApplicationReport getFinishedApplicationReport() {
return BuilderUtils.newApplicationReport(BuilderUtils.newApplicationId(
1234, 5), "user", "queue", "appname", "host", 124, null,
@ -251,6 +300,7 @@ private ClientServiceDelegate getClientServiceDelegate(
MRClientProtocol historyServerProxy, ResourceMgrDelegate rm) {
Configuration conf = new YarnConfiguration();
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.setBoolean(YarnConfiguration.RM_AM_NETWORK_ACL_CLOSED, !isAMReachableFromClient);
ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
conf, rm, oldJobId, historyServerProxy);
return clientServiceDelegate;
@ -269,4 +319,21 @@ private GetJobReportResponse getJobReportResponseFromHistoryServer() {
jobReportResponse.setJobReport(jobReport);
return jobReportResponse;
}
private GetCountersResponse getCountersResponseFromHistoryServer() {
GetCountersResponse countersResponse = Records
.newRecord(GetCountersResponse.class);
Counter counter = Records.newRecord(Counter.class);
CounterGroup counterGroup = Records.newRecord(CounterGroup.class);
Counters counters = Records.newRecord(Counters.class);
counter.setDisplayName("dummyCounter");
counter.setName("dummyCounter");
counter.setValue(1001);
counterGroup.setName("dummyCounters");
counterGroup.setDisplayName("dummyCounters");
counterGroup.setCounter("dummyCounter", counter);
counters.setCounterGroup("dummyCounters", counterGroup);
countersResponse.setCounters(counters);
return countersResponse;
}
}

View File

@ -145,6 +145,11 @@ public class YarnConfiguration extends Configuration {
/** ACL used in case none is found. Allows nothing. */
public static final String DEFAULT_YARN_APP_ACL = " ";
/** RM-AM ACL disabled. **/
public static final String RM_AM_NETWORK_ACL_CLOSED =
RM_PREFIX + "am.acl.disabled";
public static final boolean DEFAULT_RM_AM_NETWORK_ACL_CLOSED = false;
/** The address of the RM admin interface.*/
public static final String RM_ADMIN_ADDRESS =
RM_PREFIX + "admin.address";

View File

@ -117,6 +117,12 @@
<value>*</value>
</property>
<property>
<description>Network ACL to AM closed.</description>
<name>yarn.resourcemanager.am.acl.disabled</name>
<value>false</value>
</property>
<property>
<description>The address of the RM admin interface.</description>
<name>yarn.resourcemanager.admin.address</name>