YARN-7449. Split up class TestYarnClient to TestYarnClient and TestYarnClientImpl. Contributed by Szilard Nemeth.

This commit is contained in:
Miklos Szegedi 2018-06-20 11:40:56 -07:00
parent 55432b0981
commit bbbc7cc426
2 changed files with 323 additions and 266 deletions

View File

@ -18,41 +18,9 @@
package org.apache.hadoop.yarn.client.api.impl; package org.apache.hadoop.yarn.client.api.impl;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.lang.Thread.State;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
@ -74,7 +42,6 @@
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -92,7 +59,6 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.AHSClient; import org.apache.hadoop.yarn.client.api.AHSClient;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -100,7 +66,6 @@
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase; import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
@ -115,8 +80,28 @@
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.slf4j.event.Level; import org.slf4j.event.Level;
import java.io.IOException;
import java.lang.Thread.State;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/** /**
* This class is to test class {@link YarnClient) and {@link YarnClientImpl}. * This class is to test class {@link YarnClient).
*/ */
public class TestYarnClient extends ParameterizedSchedulerTestBase { public class TestYarnClient extends ParameterizedSchedulerTestBase {
@ -146,17 +131,6 @@ public void testClientStop() {
rm.stop(); rm.stop();
} }
@Test
public void testStartWithTimelineV15() throws Exception {
Configuration conf = getConf();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f);
YarnClientImpl client = (YarnClientImpl) YarnClient.createYarnClient();
client.init(conf);
client.start();
client.stop();
}
@Test @Test
public void testStartTimelineClientWithErrors() public void testStartTimelineClientWithErrors()
throws Exception { throws Exception {
@ -413,7 +387,7 @@ public void testApplicationType() throws Exception {
RMApp app = rm.submitApp(2000); RMApp app = rm.submitApp(2000);
RMApp app1 = RMApp app1 =
rm.submitApp(200, "name", "user", rm.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", -1, new HashMap<>(), false, "default", -1,
null, "MAPREDUCE"); null, "MAPREDUCE");
Assert.assertEquals("YARN", app.getApplicationType()); Assert.assertEquals("YARN", app.getApplicationType());
Assert.assertEquals("MAPREDUCE", app1.getApplicationType()); Assert.assertEquals("MAPREDUCE", app1.getApplicationType());
@ -427,7 +401,7 @@ public void testApplicationTypeLimit() throws Exception {
rm.start(); rm.start();
RMApp app1 = RMApp app1 =
rm.submitApp(200, "name", "user", rm.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default", -1, new HashMap<>(), false, "default", -1,
null, "MAPREDUCE-LENGTH-IS-20"); null, "MAPREDUCE-LENGTH-IS-20");
Assert.assertEquals("MAPREDUCE-LENGTH-IS-", app1.getApplicationType()); Assert.assertEquals("MAPREDUCE-LENGTH-IS-", app1.getApplicationType());
rm.stop(); rm.stop();
@ -444,7 +418,7 @@ public void testGetApplications() throws YarnException, IOException {
List<ApplicationReport> reports = client.getApplications(); List<ApplicationReport> reports = client.getApplications();
Assert.assertEquals(reports, expectedReports); Assert.assertEquals(reports, expectedReports);
Set<String> appTypes = new HashSet<String>(); Set<String> appTypes = new HashSet<>();
appTypes.add("YARN"); appTypes.add("YARN");
appTypes.add("NON-YARN"); appTypes.add("NON-YARN");
@ -601,7 +575,7 @@ public void testGetLabelsToNodes() throws YarnException, IOException {
Assert.assertEquals(labelsToNodes.size(), 3); Assert.assertEquals(labelsToNodes.size(), 3);
// Get labels to nodes for selected labels // Get labels to nodes for selected labels
Set<String> setLabels = new HashSet<String>(Arrays.asList("x", "z")); Set<String> setLabels = new HashSet<>(Arrays.asList("x", "z"));
expectedLabelsToNodes = expectedLabelsToNodes =
((MockYarnClient)client).getLabelsToNodesMap(setLabels); ((MockYarnClient)client).getLabelsToNodesMap(setLabels);
labelsToNodes = client.getLabelsToNodes(setLabels); labelsToNodes = client.getLabelsToNodes(setLabels);
@ -634,11 +608,11 @@ private static class MockYarnClient extends YarnClientImpl {
private ApplicationReport mockReport; private ApplicationReport mockReport;
private List<ApplicationReport> reports; private List<ApplicationReport> reports;
private HashMap<ApplicationId, List<ApplicationAttemptReport>> attempts = private HashMap<ApplicationId, List<ApplicationAttemptReport>> attempts =
new HashMap<ApplicationId, List<ApplicationAttemptReport>>(); new HashMap<>();
private HashMap<ApplicationAttemptId, List<ContainerReport>> containers = private HashMap<ApplicationAttemptId, List<ContainerReport>> containers =
new HashMap<ApplicationAttemptId, List<ContainerReport>>(); new HashMap<>();
private HashMap<ApplicationAttemptId, List<ContainerReport>> containersFromAHS = private HashMap<ApplicationAttemptId, List<ContainerReport>> containersFromAHS =
new HashMap<ApplicationAttemptId, List<ContainerReport>>(); new HashMap<>();
GetApplicationsResponse mockAppResponse = GetApplicationsResponse mockAppResponse =
mock(GetApplicationsResponse.class); mock(GetApplicationsResponse.class);
@ -739,9 +713,9 @@ private List<ApplicationReport> createAppReports() {
"user", "queue", "appname", "host", 124, null, "user", "queue", "appname", "host", 124, null,
YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0, 0, YarnApplicationState.RUNNING, "diagnostics", "url", 0, 0, 0,
FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null); FinalApplicationStatus.SUCCEEDED, null, "N/A", 0.53789f, "YARN", null);
List<ApplicationReport> applicationReports = new ArrayList<ApplicationReport>(); List<ApplicationReport> applicationReports = new ArrayList<>();
applicationReports.add(newApplicationReport); applicationReports.add(newApplicationReport);
List<ApplicationAttemptReport> appAttempts = new ArrayList<ApplicationAttemptReport>(); List<ApplicationAttemptReport> appAttempts = new ArrayList<>();
ApplicationAttemptReport attempt = ApplicationAttemptReport.newInstance( ApplicationAttemptReport attempt = ApplicationAttemptReport.newInstance(
ApplicationAttemptId.newInstance(applicationId, 1), ApplicationAttemptId.newInstance(applicationId, 1),
"host", "host",
@ -767,7 +741,7 @@ private List<ApplicationReport> createAppReports() {
appAttempts.add(attempt1); appAttempts.add(attempt1);
attempts.put(applicationId, appAttempts); attempts.put(applicationId, appAttempts);
List<ContainerReport> containerReports = new ArrayList<ContainerReport>(); List<ContainerReport> containerReports = new ArrayList<>();
ContainerReport container = ContainerReport.newInstance( ContainerReport container = ContainerReport.newInstance(
ContainerId.newContainerId(attempt.getApplicationAttemptId(), 1), null, ContainerId.newContainerId(attempt.getApplicationAttemptId(), 1), null,
NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678, NodeId.newInstance("host", 1234), Priority.UNDEFINED, 1234, 5678,
@ -785,7 +759,7 @@ private List<ApplicationReport> createAppReports() {
//add containers to be sent from AHS //add containers to be sent from AHS
List<ContainerReport> containerReportsForAHS = List<ContainerReport> containerReportsForAHS =
new ArrayList<ContainerReport>(); new ArrayList<>();
container = ContainerReport.newInstance( container = ContainerReport.newInstance(
ContainerId.newContainerId(attempt.getApplicationAttemptId(), 1), null, ContainerId.newContainerId(attempt.getApplicationAttemptId(), 1), null,
@ -843,7 +817,7 @@ private List<ApplicationReport> getApplicationReports(
List<ApplicationReport> applicationReports, List<ApplicationReport> applicationReports,
Set<String> applicationTypes, EnumSet<YarnApplicationState> applicationStates) { Set<String> applicationTypes, EnumSet<YarnApplicationState> applicationStates) {
List<ApplicationReport> appReports = new ArrayList<ApplicationReport>(); List<ApplicationReport> appReports = new ArrayList<>();
for (ApplicationReport appReport : applicationReports) { for (ApplicationReport appReport : applicationReports) {
if (applicationTypes != null && !applicationTypes.isEmpty()) { if (applicationTypes != null && !applicationTypes.isEmpty()) {
if (!applicationTypes.contains(appReport.getApplicationType())) { if (!applicationTypes.contains(appReport.getApplicationType())) {
@ -878,9 +852,9 @@ public Map<String, Set<NodeId>> getLabelsToNodes(Set<String> labels)
} }
public Map<String, Set<NodeId>> getLabelsToNodesMap() { public Map<String, Set<NodeId>> getLabelsToNodesMap() {
Map<String, Set<NodeId>> map = new HashMap<String, Set<NodeId>>(); Map<String, Set<NodeId>> map = new HashMap<>();
Set<NodeId> setNodeIds = Set<NodeId> setNodeIds =
new HashSet<NodeId>(Arrays.asList( new HashSet<>(Arrays.asList(
NodeId.newInstance("host1", 0), NodeId.newInstance("host2", 0))); NodeId.newInstance("host1", 0), NodeId.newInstance("host2", 0)));
map.put("x", setNodeIds); map.put("x", setNodeIds);
map.put("y", setNodeIds); map.put("y", setNodeIds);
@ -889,8 +863,8 @@ public Map<String, Set<NodeId>> getLabelsToNodesMap() {
} }
public Map<String, Set<NodeId>> getLabelsToNodesMap(Set<String> labels) { public Map<String, Set<NodeId>> getLabelsToNodesMap(Set<String> labels) {
Map<String, Set<NodeId>> map = new HashMap<String, Set<NodeId>>(); Map<String, Set<NodeId>> map = new HashMap<>();
Set<NodeId> setNodeIds = new HashSet<NodeId>(Arrays.asList( Set<NodeId> setNodeIds = new HashSet<>(Arrays.asList(
NodeId.newInstance("host1", 0), NodeId.newInstance("host2", 0))); NodeId.newInstance("host1", 0), NodeId.newInstance("host2", 0)));
for (String label : labels) { for (String label : labels) {
map.put(label, setNodeIds); map.put(label, setNodeIds);
@ -907,8 +881,8 @@ public Map<NodeId, Set<String>> getNodeToLabels() throws YarnException,
} }
public Map<NodeId, Set<String>> getNodeToLabelsMap() { public Map<NodeId, Set<String>> getNodeToLabelsMap() {
Map<NodeId, Set<String>> map = new HashMap<NodeId, Set<String>>(); Map<NodeId, Set<String>> map = new HashMap<>();
Set<String> setNodeLabels = new HashSet<String>(Arrays.asList("x", "y")); Set<String> setNodeLabels = new HashSet<>(Arrays.asList("x", "y"));
map.put(NodeId.newInstance("host", 0), setNodeLabels); map.put(NodeId.newInstance("host", 0), setNodeLabels);
return map; return map;
} }
@ -985,7 +959,7 @@ public List<ContainerReport> getContainersReport(
private ContainerReport getContainer( private ContainerReport getContainer(
ContainerId containerId, ContainerId containerId,
HashMap<ApplicationAttemptId, List<ContainerReport>> containersToAppAttemptMapping) HashMap<ApplicationAttemptId, List<ContainerReport>> containersToAppAttemptMapping)
throws YarnException, IOException { throws YarnException {
List<ContainerReport> containersForAppAttempt = List<ContainerReport> containersForAppAttempt =
containersToAppAttemptMapping.get(containerId containersToAppAttemptMapping.get(containerId
.getApplicationAttemptId()); .getApplicationAttemptId());
@ -1119,174 +1093,6 @@ private void waitTillAccepted(YarnClient rmClient, ApplicationId appId,
Assert.assertEquals(unmanagedApplication, report.isUnmanagedApp()); Assert.assertEquals(unmanagedApplication, report.isUnmanagedApp());
} }
@Test
public void testAsyncAPIPollTimeout() {
testAsyncAPIPollTimeoutHelper(null, false);
testAsyncAPIPollTimeoutHelper(0L, true);
testAsyncAPIPollTimeoutHelper(1L, true);
}
private void testAsyncAPIPollTimeoutHelper(Long valueForTimeout,
boolean expectedTimeoutEnforcement) {
YarnClientImpl client = new YarnClientImpl();
try {
Configuration conf = getConf();
if (valueForTimeout != null) {
conf.setLong(
YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS,
valueForTimeout);
}
client.init(conf);
Assert.assertEquals(
expectedTimeoutEnforcement, client.enforceAsyncAPITimeout());
} finally {
IOUtils.closeQuietly(client);
}
}
@Test
public void testBestEffortTimelineDelegationToken()
throws Exception {
Configuration conf = getConf();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
YarnClientImpl client = spy(new YarnClientImpl() {
@Override
TimelineClient createTimelineClient() throws IOException, YarnException {
timelineClient = mock(TimelineClient.class);
when(timelineClient.getDelegationToken(any(String.class)))
.thenThrow(new RuntimeException("Best effort test exception"));
return timelineClient;
}
});
client.init(conf);
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_CLIENT_BEST_EFFORT,
true);
client.serviceInit(conf);
client.getTimelineDelegationToken();
try {
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_CLIENT_BEST_EFFORT, false);
client.serviceInit(conf);
client.getTimelineDelegationToken();
Assert.fail("Get delegation token should have thrown an exception");
} catch (IOException e) {
// Success
}
}
@Test
public void testAutomaticTimelineDelegationTokenLoading()
throws Exception {
Configuration conf = getConf();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
TimelineDelegationTokenIdentifier timelineDT =
new TimelineDelegationTokenIdentifier();
final Token<TimelineDelegationTokenIdentifier> dToken =
new Token<TimelineDelegationTokenIdentifier>(
timelineDT.getBytes(), new byte[0], timelineDT.getKind(), new Text());
// create a mock client
YarnClientImpl client = spy(new YarnClientImpl() {
@Override
TimelineClient createTimelineClient() throws IOException, YarnException {
timelineClient = mock(TimelineClient.class);
when(timelineClient.getDelegationToken(any(String.class)))
.thenReturn(dToken);
return timelineClient;
}
@Override
protected void serviceStart() throws Exception {
rmClient = mock(ApplicationClientProtocol.class);
}
@Override
protected void serviceStop() throws Exception {
}
@Override
public ApplicationReport getApplicationReport(ApplicationId appId) {
ApplicationReport report = mock(ApplicationReport.class);
when(report.getYarnApplicationState())
.thenReturn(YarnApplicationState.RUNNING);
return report;
}
@Override
public boolean isSecurityEnabled() {
return true;
}
});
client.init(conf);
client.start();
try {
// when i == 0, timeline DT already exists, no need to get one more
// when i == 1, timeline DT doesn't exist, need to get one more
for (int i = 0; i < 2; ++i) {
ApplicationSubmissionContext context =
mock(ApplicationSubmissionContext.class);
ApplicationId applicationId = ApplicationId.newInstance(0, i + 1);
when(context.getApplicationId()).thenReturn(applicationId);
DataOutputBuffer dob = new DataOutputBuffer();
Credentials credentials = new Credentials();
if (i == 0) {
credentials.addToken(client.timelineService, dToken);
}
credentials.writeTokenStorageToStream(dob);
ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
null, null, null, null, tokens, null);
when(context.getAMContainerSpec()).thenReturn(clc);
client.submitApplication(context);
if (i == 0) {
// GetTimelineDelegationToken shouldn't be called
verify(client, never()).getTimelineDelegationToken();
}
// In either way, token should be there
credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
tokens = clc.getTokens();
if (tokens != null) {
dibb.reset(tokens);
credentials.readTokenStorageStream(dibb);
tokens.rewind();
}
Collection<Token<? extends TokenIdentifier>> dTokens =
credentials.getAllTokens();
Assert.assertEquals(1, dTokens.size());
Assert.assertEquals(dToken, dTokens.iterator().next());
}
} finally {
client.stop();
}
}
@Test
public void testParseTimelineDelegationTokenRenewer() throws Exception {
// Client side
YarnClientImpl client = (YarnClientImpl) YarnClient.createYarnClient();
Configuration conf = getConf();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.set(YarnConfiguration.RM_PRINCIPAL, "rm/_HOST@EXAMPLE.COM");
conf.set(
YarnConfiguration.RM_ADDRESS, "localhost:8188");
try {
client.init(conf);
client.start();
Assert.assertEquals("rm/localhost@EXAMPLE.COM", client.timelineDTRenewer);
} finally {
client.stop();
}
}
@Test(timeout = 30000, expected = ApplicationNotFoundException.class) @Test(timeout = 30000, expected = ApplicationNotFoundException.class)
public void testShouldNotRetryForeverForNonNetworkExceptions() throws Exception { public void testShouldNotRetryForeverForNonNetworkExceptions() throws Exception {
YarnConfiguration conf = getConf(); YarnConfiguration conf = getConf();
@ -1353,38 +1159,35 @@ private void testCreateTimelineClientWithError(
timelineClientBestEffort); timelineClientBestEffort);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
timelineVersion); timelineVersion);
YarnClient client = new MockYarnClient(); MockYarnClient client = new MockYarnClient();
if (client instanceof YarnClientImpl) { MockYarnClient spyClient = spy(client);
YarnClientImpl impl = (YarnClientImpl) client; when(spyClient.createTimelineClient()).thenThrow(mockErr);
YarnClientImpl spyClient = spy(impl); CreateTimelineClientErrorVerifier verifier = spy(errVerifier);
when(spyClient.createTimelineClient()).thenThrow(mockErr); spyClient.init(conf);
CreateTimelineClientErrorVerifier verifier = spy(errVerifier); spyClient.start();
spyClient.init(conf);
spyClient.start();
ApplicationSubmissionContext context = ApplicationSubmissionContext context =
mock(ApplicationSubmissionContext.class); mock(ApplicationSubmissionContext.class);
ContainerLaunchContext containerContext = ContainerLaunchContext containerContext =
mock(ContainerLaunchContext.class); mock(ContainerLaunchContext.class);
ApplicationId applicationId = ApplicationId applicationId =
ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationId.newInstance(System.currentTimeMillis(), 1);
when(containerContext.getTokens()).thenReturn(null); when(containerContext.getTokens()).thenReturn(null);
when(context.getApplicationId()).thenReturn(applicationId); when(context.getApplicationId()).thenReturn(applicationId);
when(spyClient.isSecurityEnabled()).thenReturn(true); when(spyClient.isSecurityEnabled()).thenReturn(true);
when(context.getAMContainerSpec()).thenReturn(containerContext); when(context.getAMContainerSpec()).thenReturn(containerContext);
try { try {
spyClient.submitApplication(context); spyClient.submitApplication(context);
} catch (Throwable e) { } catch (Throwable e) {
verifier.verifyError(e); verifier.verifyError(e);
} finally { } finally {
// Make sure the verifier runs with expected times // Make sure the verifier runs with expected times
// This is required because in case throwable is swallowed // This is required because in case throwable is swallowed
// and verifyError never gets the chance to run // and verifyError never gets the chance to run
verify(verifier, times(verifier.getExpectedTimes())) verify(verifier, times(verifier.getExpectedTimes()))
.verifyError(any(Throwable.class)); .verifyError(any(Throwable.class));
spyClient.stop(); spyClient.stop();
}
} }
} }

View File

@ -0,0 +1,254 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager
.ParameterizedSchedulerTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* This class is to test class {@link YarnClientImpl ).
*/
public class TestYarnClientImpl extends ParameterizedSchedulerTestBase {
public TestYarnClientImpl(SchedulerType type) throws IOException {
super(type);
}
@Before
public void setup() {
QueueMetrics.clearQueueMetrics();
DefaultMetricsSystem.setMiniClusterMode(true);
}
@Test
public void testStartWithTimelineV15() {
Configuration conf = getConf();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f);
YarnClientImpl client = (YarnClientImpl) YarnClient.createYarnClient();
client.init(conf);
client.start();
client.stop();
}
@Test
public void testAsyncAPIPollTimeout() {
testAsyncAPIPollTimeoutHelper(null, false);
testAsyncAPIPollTimeoutHelper(0L, true);
testAsyncAPIPollTimeoutHelper(1L, true);
}
private void testAsyncAPIPollTimeoutHelper(Long valueForTimeout,
boolean expectedTimeoutEnforcement) {
YarnClientImpl client = new YarnClientImpl();
try {
Configuration conf = getConf();
if (valueForTimeout != null) {
conf.setLong(
YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS,
valueForTimeout);
}
client.init(conf);
Assert.assertEquals(
expectedTimeoutEnforcement, client.enforceAsyncAPITimeout());
} finally {
IOUtils.closeQuietly(client);
}
}
@Test
public void testBestEffortTimelineDelegationToken()
throws Exception {
Configuration conf = getConf();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
YarnClientImpl client = spy(new YarnClientImpl() {
@Override
TimelineClient createTimelineClient() throws IOException, YarnException {
timelineClient = mock(TimelineClient.class);
when(timelineClient.getDelegationToken(any(String.class)))
.thenThrow(new RuntimeException("Best effort test exception"));
return timelineClient;
}
});
client.init(conf);
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_CLIENT_BEST_EFFORT,
true);
client.serviceInit(conf);
client.getTimelineDelegationToken();
try {
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_CLIENT_BEST_EFFORT, false);
client.serviceInit(conf);
client.getTimelineDelegationToken();
Assert.fail("Get delegation token should have thrown an exception");
} catch (IOException e) {
// Success
}
}
@Test
public void testAutomaticTimelineDelegationTokenLoading()
throws Exception {
Configuration conf = getConf();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
TimelineDelegationTokenIdentifier timelineDT =
new TimelineDelegationTokenIdentifier();
final Token<TimelineDelegationTokenIdentifier> dToken =
new Token<>(
timelineDT.getBytes(), new byte[0], timelineDT.getKind(), new Text());
// create a mock client
YarnClientImpl client = spy(new YarnClientImpl() {
@Override
TimelineClient createTimelineClient() throws IOException, YarnException {
timelineClient = mock(TimelineClient.class);
when(timelineClient.getDelegationToken(any(String.class)))
.thenReturn(dToken);
return timelineClient;
}
@Override
protected void serviceStart() {
rmClient = mock(ApplicationClientProtocol.class);
}
@Override
protected void serviceStop() {
}
@Override
public ApplicationReport getApplicationReport(ApplicationId appId) {
ApplicationReport report = mock(ApplicationReport.class);
when(report.getYarnApplicationState())
.thenReturn(YarnApplicationState.RUNNING);
return report;
}
@Override
public boolean isSecurityEnabled() {
return true;
}
});
client.init(conf);
client.start();
try {
// when i == 0, timeline DT already exists, no need to get one more
// when i == 1, timeline DT doesn't exist, need to get one more
for (int i = 0; i < 2; ++i) {
ApplicationSubmissionContext context =
mock(ApplicationSubmissionContext.class);
ApplicationId applicationId = ApplicationId.newInstance(0, i + 1);
when(context.getApplicationId()).thenReturn(applicationId);
DataOutputBuffer dob = new DataOutputBuffer();
Credentials credentials = new Credentials();
if (i == 0) {
credentials.addToken(client.timelineService, dToken);
}
credentials.writeTokenStorageToStream(dob);
ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
null, null, null, null, tokens, null);
when(context.getAMContainerSpec()).thenReturn(clc);
client.submitApplication(context);
if (i == 0) {
// GetTimelineDelegationToken shouldn't be called
verify(client, never()).getTimelineDelegationToken();
}
// In either way, token should be there
credentials = new Credentials();
DataInputByteBuffer dibb = new DataInputByteBuffer();
tokens = clc.getTokens();
if (tokens != null) {
dibb.reset(tokens);
credentials.readTokenStorageStream(dibb);
tokens.rewind();
}
Collection<Token<? extends TokenIdentifier>> dTokens =
credentials.getAllTokens();
Assert.assertEquals(1, dTokens.size());
Assert.assertEquals(dToken, dTokens.iterator().next());
}
} finally {
client.stop();
}
}
@Test
public void testParseTimelineDelegationTokenRenewer() {
// Client side
YarnClientImpl client = (YarnClientImpl) YarnClient.createYarnClient();
Configuration conf = getConf();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.set(YarnConfiguration.RM_PRINCIPAL, "rm/_HOST@EXAMPLE.COM");
conf.set(
YarnConfiguration.RM_ADDRESS, "localhost:8188");
try {
client.init(conf);
client.start();
Assert.assertEquals("rm/localhost@EXAMPLE.COM", client.timelineDTRenewer);
} finally {
client.stop();
}
}
}