diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
index 15e0eaf858..e5631189f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml
@@ -38,6 +38,19 @@
provided
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test
+
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ test-jar
+ test
+
+
com.google.guava
guava
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
index 14133ba4ec..ebc4e761a5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
@@ -30,9 +30,12 @@
import java.util.Set;
import java.util.concurrent.Future;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
@@ -131,6 +134,8 @@
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -314,6 +319,16 @@ public YarnClientApplication createApplication()
addTimelineDelegationToken(appContext.getAMContainerSpec());
}
+ // Automatically add the DT for Log Aggregation path
+ // This is useful when a separate storage is used for log aggregation
+ try {
+ if (isSecurityEnabled()) {
+ addLogAggregationDelegationToken(appContext.getAMContainerSpec());
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to obtain delegation token for Log Aggregation Path", e);
+ }
+
//TODO: YARN-1763:Handle RM failovers during the submitApplication call.
rmClient.submitApplication(request);
@@ -373,6 +388,47 @@ public YarnClientApplication createApplication()
return applicationId;
}
+ private void addLogAggregationDelegationToken(
+ ContainerLaunchContext clc) throws YarnException, IOException {
+ Credentials credentials = new Credentials();
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ ByteBuffer tokens = clc.getTokens();
+ if (tokens != null) {
+ dibb.reset(tokens);
+ credentials.readTokenStorageStream(dibb);
+ tokens.rewind();
+ }
+
+ Configuration conf = getConfig();
+ String masterPrincipal = YarnClientUtils.getRmPrincipal(conf);
+ if (StringUtils.isEmpty(masterPrincipal)) {
+ throw new IOException(
+ "Can't get Master Kerberos principal for use as renewer");
+ }
+ LOG.debug("Delegation Token Renewer: " + masterPrincipal);
+
+ LogAggregationFileControllerFactory factory =
+ new LogAggregationFileControllerFactory(conf);
+ LogAggregationFileController fileController =
+ factory.getFileControllerForWrite();
+ Path remoteRootLogDir = fileController.getRemoteRootLogDir();
+ FileSystem fs = remoteRootLogDir.getFileSystem(conf);
+
+ final org.apache.hadoop.security.token.Token>[] finalTokens =
+ fs.addDelegationTokens(masterPrincipal, credentials);
+ if (finalTokens != null) {
+ for (org.apache.hadoop.security.token.Token> token : finalTokens) {
+ LOG.info("Added delegation token for log aggregation path "
+ + remoteRootLogDir + "; "+token);
+ }
+ }
+
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ clc.setTokens(tokens);
+ }
+
private void addTimelineDelegationToken(
ContainerLaunchContext clc) throws YarnException, IOException {
Credentials credentials = new Credentials();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClientImpl.java
index a6259a7be0..8446f9fbda 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClientImpl.java
@@ -20,6 +20,12 @@
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
@@ -46,12 +52,15 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@@ -63,6 +72,8 @@
*/
public class TestYarnClientImpl extends ParameterizedSchedulerTestBase {
+ protected static final String YARN_RM = "yarn-rm@EXAMPLE.COM";
+
public TestYarnClientImpl(SchedulerType type) throws IOException {
super(type);
}
@@ -145,6 +156,122 @@ TimelineClient createTimelineClient() throws IOException, YarnException {
}
}
+ // Validates if YarnClientImpl automatically adds HDFS Delegation
+ // token for Log Aggregation Path in a cluster setup with fs.DefaultFS
+ // set to LocalFileSystem and Log Aggregation Path set to HDFS.
+ @Test
+ public void testAutomaitcLogAggregationDelegationToken()
+ throws Exception {
+ Configuration conf = getConf();
+ SecurityUtil.setAuthenticationMethod(
+ UserGroupInformation.AuthenticationMethod.KERBEROS, conf);
+ conf.set(YarnConfiguration.RM_PRINCIPAL, YARN_RM);
+ String remoteRootLogPath = "/tmp/app-logs";
+
+ MiniDFSCluster hdfsCluster = null;
+ try {
+ // Step 1: Start a MiniDFSCluster for Log Aggregation Path
+ HdfsConfiguration hdfsConfig = new HdfsConfiguration();
+ hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
+ .numDataNodes(1).build();
+
+ Path remoteRootLogDir = new Path(remoteRootLogPath);
+
+ FileSystem fs = hdfsCluster.getFileSystem();
+ fs.mkdirs(remoteRootLogDir);
+ conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ fs.getFileStatus(remoteRootLogDir).getPath().toString());
+
+ // Step 2: Prepare a Mock FileSystem which returns Delegation Token
+ // when YarnClientImpl invokes
+ DelegationTokenIdentifier hdfsDT = new DelegationTokenIdentifier(new Text(
+ "test"), new Text(YARN_RM), null);
+ final Token dToken =
+ new Token<>(hdfsDT.getBytes(), new byte[0], hdfsDT.getKind(),
+ new Text());
+
+ FileSystem mockFs = mock(FileSystem.class);
+ doAnswer(new Answer[]>() {
+ @Override
+ public Token>[] answer(InvocationOnMock invocation) {
+ Object[] args = invocation.getArguments();
+ ((Credentials) args[1]).addToken(hdfsDT.getKind(), dToken);
+ return new Token[]{dToken};
+ }
+ }).when(mockFs).addDelegationTokens(any(), any());
+
+ FileSystemTestHelper.addFileSystemForTesting(fs.getUri(),
+ hdfsConfig, mockFs);
+
+ // Step 3: Prepare a Mock YarnClientImpl
+ YarnClientImpl client = spy(new YarnClientImpl() {
+
+ @Override
+ protected void serviceStart() {
+ rmClient = mock(ApplicationClientProtocol.class);
+ }
+
+ @Override
+ protected void serviceStop() {
+ }
+
+ @Override
+ public ApplicationReport getApplicationReport(ApplicationId appId) {
+ ApplicationReport report = mock(ApplicationReport.class);
+ when(report.getYarnApplicationState())
+ .thenReturn(YarnApplicationState.RUNNING);
+ return report;
+ }
+
+ @Override
+ public boolean isSecurityEnabled() {
+ return true;
+ }
+ });
+
+ client.init(conf);
+ client.start();
+
+ // Step 4: Prepare a ApplicationSubmissionContext and submit the app
+ ApplicationSubmissionContext context =
+ mock(ApplicationSubmissionContext.class);
+ ApplicationId applicationId = ApplicationId.newInstance(0, 1);
+ when(context.getApplicationId()).thenReturn(applicationId);
+
+ DataOutputBuffer dob = new DataOutputBuffer();
+ Credentials credentials = new Credentials();
+ credentials.writeTokenStorageToStream(dob);
+ ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+
+ ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+ null, null, null, null, tokens, null);
+ when(context.getAMContainerSpec()).thenReturn(clc);
+
+ client.submitApplication(context);
+
+ // Step 5: Verify automatic addition of HDFS DT for log aggregation path
+ credentials = new Credentials();
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ tokens = clc.getTokens();
+ if (tokens != null) {
+ dibb.reset(tokens);
+ credentials.readTokenStorageStream(dibb);
+ tokens.rewind();
+ }
+ Collection> dTokens =
+ credentials.getAllTokens();
+ Assert.assertEquals("Failed to place token for Log Aggregation Path",
+ 1, dTokens.size());
+ Assert.assertEquals("Wrong Token for Log Aggregation",
+ hdfsDT.getKind(), dTokens.iterator().next().getKind());
+
+ } finally {
+ if (hdfsCluster != null) {
+ hdfsCluster.shutdown();
+ }
+ }
+ }
+
@Test
public void testAutomaticTimelineDelegationTokenLoading()
throws Exception {