HADOOP-18548. Hadoop Archive tool (HAR) should acquire delegation tokens from source and destination file systems (#5355)

Signed-off-by: Chris Nauroth <cnauroth@apache.org>
This commit is contained in:
Galsza 2023-03-30 01:12:02 +02:00 committed by GitHub
parent b4bcbb9515
commit 016362a28b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -37,6 +37,8 @@
import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.Parser; import org.apache.commons.cli.Parser;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -487,6 +489,11 @@ void archive(Path parentPath, List<Path> srcPaths,
+ " should be a directory but is a file"); + " should be a directory but is a file");
} }
conf.set(DST_DIR_LABEL, outputPath.toString()); conf.set(DST_DIR_LABEL, outputPath.toString());
Credentials credentials = conf.getCredentials();
Path[] allPaths = new Path[] {parentPath, dest};
TokenCache.obtainTokensForNamenodes(credentials, allPaths, conf);
conf.setCredentials(credentials);
Path stagingArea; Path stagingArea;
try { try {
stagingArea = JobSubmissionFiles.getStagingDir(new Cluster(conf), stagingArea = JobSubmissionFiles.getStagingDir(new Cluster(conf),
@ -498,11 +505,11 @@ void archive(Path parentPath, List<Path> srcPaths,
NAME+"_"+Integer.toString(new Random().nextInt(Integer.MAX_VALUE), 36)); NAME+"_"+Integer.toString(new Random().nextInt(Integer.MAX_VALUE), 36));
FsPermission mapredSysPerms = FsPermission mapredSysPerms =
new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
FileSystem.mkdirs(jobDirectory.getFileSystem(conf), jobDirectory, FileSystem jobfs = jobDirectory.getFileSystem(conf);
FileSystem.mkdirs(jobfs, jobDirectory,
mapredSysPerms); mapredSysPerms);
conf.set(JOB_DIR_LABEL, jobDirectory.toString()); conf.set(JOB_DIR_LABEL, jobDirectory.toString());
//get a tmp directory for input splits //get a tmp directory for input splits
FileSystem jobfs = jobDirectory.getFileSystem(conf);
Path srcFiles = new Path(jobDirectory, "_har_src_files"); Path srcFiles = new Path(jobDirectory, "_har_src_files");
conf.set(SRC_LIST_LABEL, srcFiles.toString()); conf.set(SRC_LIST_LABEL, srcFiles.toString());
SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf, SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf,