MAPREDUCE-6550. archive-logs tool changes log ownership to the Yarn user when using DefaultContainerExecutor (rkanter)
This commit is contained in:
parent
7fd00b3db4
commit
6d84cc16b3
@ -653,6 +653,9 @@ Release 2.8.0 - UNRELEASED
|
||||
MAPREDUCE-6549. multibyte delimiters with LineRecordReader cause
|
||||
duplicate records (wilfreds via rkanter)
|
||||
|
||||
MAPREDUCE-6550. archive-logs tool changes log ownership to the Yarn
|
||||
user when using DefaultContainerExecutor (rkanter)
|
||||
|
||||
Release 2.7.3 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -77,6 +77,7 @@ public class HadoopArchiveLogs implements Tool {
|
||||
private static final String MEMORY_OPTION = "memory";
|
||||
private static final String VERBOSE_OPTION = "verbose";
|
||||
private static final String FORCE_OPTION = "force";
|
||||
private static final String NO_PROXY_OPTION = "noProxy";
|
||||
|
||||
private static final int DEFAULT_MAX_ELIGIBLE = -1;
|
||||
private static final int DEFAULT_MIN_NUM_LOG_FILES = 20;
|
||||
@ -94,6 +95,8 @@ public class HadoopArchiveLogs implements Tool {
|
||||
private boolean verbose = false;
|
||||
@VisibleForTesting
|
||||
boolean force = false;
|
||||
@VisibleForTesting
|
||||
boolean proxy = true;
|
||||
|
||||
@VisibleForTesting
|
||||
Set<AppInfo> eligibleApplications;
|
||||
@ -208,6 +211,12 @@ private void handleOpts(String[] args) throws ParseException {
|
||||
"Force recreating the working directory if an existing one is found. " +
|
||||
"This should only be used if you know that another instance is " +
|
||||
"not currently running");
|
||||
Option noProxyOpt = new Option(NO_PROXY_OPTION, false,
|
||||
"When specified, all processing will be done as the user running this" +
|
||||
" command (or the Yarn user if DefaultContainerExecutor is in " +
|
||||
"use). When not specified, all processing will be done as the " +
|
||||
"user who owns that application; if the user running this command" +
|
||||
" is not allowed to impersonate that user, it will fail");
|
||||
opts.addOption(helpOpt);
|
||||
opts.addOption(maxEligibleOpt);
|
||||
opts.addOption(minNumLogFilesOpt);
|
||||
@ -215,6 +224,7 @@ private void handleOpts(String[] args) throws ParseException {
|
||||
opts.addOption(memoryOpt);
|
||||
opts.addOption(verboseOpt);
|
||||
opts.addOption(forceOpt);
|
||||
opts.addOption(noProxyOpt);
|
||||
|
||||
try {
|
||||
CommandLineParser parser = new GnuParser();
|
||||
@ -252,6 +262,9 @@ private void handleOpts(String[] args) throws ParseException {
|
||||
if (commandLine.hasOption(FORCE_OPTION)) {
|
||||
force = true;
|
||||
}
|
||||
if (commandLine.hasOption(NO_PROXY_OPTION)) {
|
||||
proxy = false;
|
||||
}
|
||||
} catch (ParseException pe) {
|
||||
HelpFormatter formatter = new HelpFormatter();
|
||||
formatter.printHelp("mapred archive-logs", opts);
|
||||
@ -274,7 +287,7 @@ boolean prepareWorkingDir(FileSystem fs, Path workingDir) throws IOException {
|
||||
}
|
||||
fs.mkdirs(workingDir);
|
||||
fs.setPermission(workingDir,
|
||||
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE));
|
||||
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL, true));
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -479,6 +492,9 @@ void generateScript(File localScript, Path workingDir,
|
||||
fw.write(remoteRootLogDir.toString());
|
||||
fw.write(" -suffix ");
|
||||
fw.write(suffix);
|
||||
if (!proxy) {
|
||||
fw.write(" -noProxy\n");
|
||||
}
|
||||
fw.write("\n");
|
||||
} finally {
|
||||
if (fw != null) {
|
||||
|
@ -31,33 +31,45 @@
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
import java.io.File;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
/**
|
||||
* This is a child program designed to be used by the {@link HadoopArchiveLogs}
|
||||
* tool via the Distributed Shell. It's not meant to be run directly.
|
||||
*/
|
||||
public class HadoopArchiveLogsRunner implements Tool {
|
||||
private static final Log LOG = LogFactory.getLog(HadoopArchiveLogsRunner.class);
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(HadoopArchiveLogsRunner.class);
|
||||
|
||||
private static final String APP_ID_OPTION = "appId";
|
||||
private static final String USER_OPTION = "user";
|
||||
private static final String WORKING_DIR_OPTION = "workingDir";
|
||||
private static final String REMOTE_ROOT_LOG_DIR = "remoteRootLogDir";
|
||||
private static final String REMOTE_ROOT_LOG_DIR_OPTION = "remoteRootLogDir";
|
||||
private static final String SUFFIX_OPTION = "suffix";
|
||||
private static final String NO_PROXY_OPTION = "noProxy";
|
||||
|
||||
private String appId;
|
||||
private String user;
|
||||
private String workingDir;
|
||||
private String remoteLogDir;
|
||||
private String suffix;
|
||||
private boolean proxy;
|
||||
|
||||
private JobConf conf;
|
||||
|
||||
private static final FsPermission HAR_DIR_PERM =
|
||||
new FsPermission(FsAction.ALL, FsAction.READ_EXECUTE, FsAction.NONE);
|
||||
private static final FsPermission HAR_INNER_FILES_PERM =
|
||||
new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.NONE);
|
||||
|
||||
public HadoopArchiveLogsRunner(Configuration conf) {
|
||||
setConf(conf);
|
||||
}
|
||||
@ -87,13 +99,40 @@ public static void main(String[] args) {
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
handleOpts(args);
|
||||
|
||||
Integer exitCode = 1;
|
||||
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
|
||||
// If we're running as the user, then no need to impersonate
|
||||
// (which might fail if user is not a proxyuser for themselves)
|
||||
// Also if !proxy is set
|
||||
if (!proxy || loginUser.getShortUserName().equals(user)) {
|
||||
LOG.info("Running as " + user);
|
||||
exitCode = runInternal();
|
||||
} else {
|
||||
// Otherwise impersonate user. If we're not allowed to, then this will
|
||||
// fail with an Exception
|
||||
LOG.info("Running as " + loginUser.getShortUserName() + " but will " +
|
||||
"impersonate " + user);
|
||||
UserGroupInformation proxyUser =
|
||||
UserGroupInformation.createProxyUser(user, loginUser);
|
||||
exitCode = proxyUser.doAs(new PrivilegedExceptionAction<Integer>() {
|
||||
@Override
|
||||
public Integer run() throws Exception {
|
||||
return runInternal();
|
||||
}
|
||||
});
|
||||
}
|
||||
return exitCode;
|
||||
}
|
||||
|
||||
private int runInternal() throws Exception {
|
||||
String remoteAppLogDir = remoteLogDir + File.separator + user
|
||||
+ File.separator + suffix + File.separator + appId;
|
||||
|
||||
// Run 'hadoop archives' command in local mode
|
||||
Configuration haConf = new Configuration(getConf());
|
||||
haConf.set("mapreduce.framework.name", "local");
|
||||
HadoopArchives ha = new HadoopArchives(haConf);
|
||||
conf.set("mapreduce.framework.name", "local");
|
||||
// Set the umask so we get 640 files and 750 dirs
|
||||
conf.set("fs.permissions.umask-mode", "027");
|
||||
HadoopArchives ha = new HadoopArchives(conf);
|
||||
String[] haArgs = {
|
||||
"-archiveName",
|
||||
appId + ".har",
|
||||
@ -113,9 +152,9 @@ public int run(String[] args) throws Exception {
|
||||
// Move har file to correct location and delete original logs
|
||||
try {
|
||||
fs = FileSystem.get(conf);
|
||||
Path harDest = new Path(remoteAppLogDir, appId + ".har");
|
||||
LOG.info("Moving har to original location");
|
||||
fs.rename(new Path(workingDir, appId + ".har"),
|
||||
new Path(remoteAppLogDir, appId + ".har"));
|
||||
fs.rename(new Path(workingDir, appId + ".har"), harDest);
|
||||
LOG.info("Deleting original logs");
|
||||
for (FileStatus original : fs.listStatus(new Path(remoteAppLogDir),
|
||||
new PathFilter() {
|
||||
@ -131,7 +170,6 @@ public boolean accept(Path path) {
|
||||
fs.close();
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -144,24 +182,30 @@ private void handleOpts(String[] args) throws ParseException {
|
||||
Option workingDirOpt = new Option(WORKING_DIR_OPTION, true,
|
||||
"Working Directory");
|
||||
workingDirOpt.setRequired(true);
|
||||
Option remoteLogDirOpt = new Option(REMOTE_ROOT_LOG_DIR, true,
|
||||
Option remoteLogDirOpt = new Option(REMOTE_ROOT_LOG_DIR_OPTION, true,
|
||||
"Remote Root Log Directory");
|
||||
remoteLogDirOpt.setRequired(true);
|
||||
Option suffixOpt = new Option(SUFFIX_OPTION, true, "Suffix");
|
||||
suffixOpt.setRequired(true);
|
||||
Option useProxyOpt = new Option(NO_PROXY_OPTION, false, "Use Proxy");
|
||||
opts.addOption(appIdOpt);
|
||||
opts.addOption(userOpt);
|
||||
opts.addOption(workingDirOpt);
|
||||
opts.addOption(remoteLogDirOpt);
|
||||
opts.addOption(suffixOpt);
|
||||
opts.addOption(useProxyOpt);
|
||||
|
||||
CommandLineParser parser = new GnuParser();
|
||||
CommandLine commandLine = parser.parse(opts, args);
|
||||
appId = commandLine.getOptionValue(APP_ID_OPTION);
|
||||
user = commandLine.getOptionValue(USER_OPTION);
|
||||
workingDir = commandLine.getOptionValue(WORKING_DIR_OPTION);
|
||||
remoteLogDir = commandLine.getOptionValue(REMOTE_ROOT_LOG_DIR);
|
||||
remoteLogDir = commandLine.getOptionValue(REMOTE_ROOT_LOG_DIR_OPTION);
|
||||
suffix = commandLine.getOptionValue(SUFFIX_OPTION);
|
||||
proxy = true;
|
||||
if (commandLine.hasOption(NO_PROXY_OPTION)) {
|
||||
proxy = false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -48,6 +48,14 @@ How to Archive Logs
|
||||
each container (default: 1024)
|
||||
-minNumberLogFiles <n> The minimum number of log files required
|
||||
to be eligible (default: 20)
|
||||
-noProxy When specified, all processing will be
|
||||
done as the user running this command (or
|
||||
the Yarn user if DefaultContainerExecutor
|
||||
is in use). When not specified, all
|
||||
processing will be done as the user who
|
||||
owns that application; if the user
|
||||
running this command is not allowed to
|
||||
impersonate that user, it will fail
|
||||
-verbose Print more details.
|
||||
|
||||
The tool only supports running one instance on a cluster at a time in order
|
||||
@ -77,6 +85,15 @@ The tool works by performing the following procedure:
|
||||
the ``hadoop archives`` command for a single application and replaces
|
||||
its aggregated log files with the resulting archive.
|
||||
|
||||
The ``-noProxy`` option makes the tool process everything as the user who is
|
||||
currently running it, or the Yarn user if DefaultContainerExecutor is in use.
|
||||
When not specified, all processing will be done by the user who owns that
|
||||
application; if the user running this command is not allowed to impersonate that
|
||||
user, it will fail. This is useful if you want an admin user to handle all
|
||||
aggregation without enabling impersonation. With ``-noProxy`` the resulting
|
||||
HAR files will be owned by whoever ran the tool, instead of whoever originally
|
||||
owned the logs.
|
||||
|
||||
The ``-verbose`` option makes the tool print more details about what it's
|
||||
doing.
|
||||
|
||||
|
@ -163,7 +163,7 @@ public void testCheckMaxEligible() throws Exception {
|
||||
Assert.assertTrue(hal.eligibleApplications.contains(app3));
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
@Test(timeout = 30000)
|
||||
public void testFilterAppsByAggregatedStatus() throws Exception {
|
||||
MiniYARNCluster yarnCluster = null;
|
||||
try {
|
||||
@ -246,6 +246,11 @@ public void testFilterAppsByAggregatedStatus() throws Exception {
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testGenerateScript() throws Exception {
|
||||
_testGenerateScript(false);
|
||||
_testGenerateScript(true);
|
||||
}
|
||||
|
||||
private void _testGenerateScript(boolean proxy) throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
|
||||
ApplicationId app1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1);
|
||||
@ -254,6 +259,7 @@ public void testGenerateScript() throws Exception {
|
||||
USER));
|
||||
hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app2.toString(),
|
||||
USER));
|
||||
hal.proxy = proxy;
|
||||
|
||||
File localScript = new File("target", "script.sh");
|
||||
Path workingDir = new Path("/tmp", "working");
|
||||
@ -286,10 +292,21 @@ public void testGenerateScript() throws Exception {
|
||||
Assert.assertEquals("fi", lines[12]);
|
||||
Assert.assertEquals("export HADOOP_CLIENT_OPTS=\"-Xmx1024m\"", lines[13]);
|
||||
Assert.assertTrue(lines[14].startsWith("export HADOOP_CLASSPATH="));
|
||||
Assert.assertEquals("\"$HADOOP_PREFIX\"/bin/hadoop org.apache.hadoop.tools." +
|
||||
"HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" -workingDir "
|
||||
+ workingDir.toString() + " -remoteRootLogDir " +
|
||||
remoteRootLogDir.toString() + " -suffix " + suffix, lines[15]);
|
||||
if (proxy) {
|
||||
Assert.assertEquals(
|
||||
"\"$HADOOP_PREFIX\"/bin/hadoop org.apache.hadoop.tools." +
|
||||
"HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" " +
|
||||
"-workingDir " + workingDir.toString() + " -remoteRootLogDir " +
|
||||
remoteRootLogDir.toString() + " -suffix " + suffix,
|
||||
lines[15]);
|
||||
} else {
|
||||
Assert.assertEquals(
|
||||
"\"$HADOOP_PREFIX\"/bin/hadoop org.apache.hadoop.tools." +
|
||||
"HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" " +
|
||||
"-workingDir " + workingDir.toString() + " -remoteRootLogDir " +
|
||||
remoteRootLogDir.toString() + " -suffix " + suffix + " -noProxy",
|
||||
lines[15]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -325,7 +342,7 @@ public void testPrepareWorkingDir() throws Exception {
|
||||
Assert.assertTrue(dirPrepared);
|
||||
Assert.assertTrue(fs.exists(workingDir));
|
||||
Assert.assertEquals(
|
||||
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE),
|
||||
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL, true),
|
||||
fs.getFileStatus(workingDir).getPermission());
|
||||
// Throw a file in the dir
|
||||
Path dummyFile = new Path(workingDir, "dummy.txt");
|
||||
@ -337,6 +354,9 @@ public void testPrepareWorkingDir() throws Exception {
|
||||
Assert.assertFalse(dirPrepared);
|
||||
Assert.assertTrue(fs.exists(workingDir));
|
||||
Assert.assertTrue(fs.exists(dummyFile));
|
||||
Assert.assertEquals(
|
||||
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL, true),
|
||||
fs.getFileStatus(workingDir).getPermission());
|
||||
// -force is true and the dir exists, so it will recreate it and the dummy
|
||||
// won't exist anymore
|
||||
hal.force = true;
|
||||
@ -344,7 +364,7 @@ public void testPrepareWorkingDir() throws Exception {
|
||||
Assert.assertTrue(dirPrepared);
|
||||
Assert.assertTrue(fs.exists(workingDir));
|
||||
Assert.assertEquals(
|
||||
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE),
|
||||
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL, true),
|
||||
fs.getFileStatus(workingDir).getPermission());
|
||||
Assert.assertFalse(fs.exists(dummyFile));
|
||||
}
|
||||
|
@ -24,7 +24,10 @@
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.HarFs;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.mapred.JobConf;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
@ -47,7 +50,7 @@ public class TestHadoopArchiveLogsRunner {
|
||||
new Random().nextBytes(DUMMY_DATA);
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
@Test(timeout = 50000)
|
||||
public void testHadoopArchiveLogs() throws Exception {
|
||||
MiniYARNCluster yarnCluster = null;
|
||||
MiniDFSCluster dfsCluster = null;
|
||||
@ -63,6 +66,7 @@ public void testHadoopArchiveLogs() throws Exception {
|
||||
yarnCluster.start();
|
||||
conf = yarnCluster.getConfig();
|
||||
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
conf = new JobConf(conf);
|
||||
|
||||
ApplicationId app1 =
|
||||
ApplicationId.newInstance(System.currentTimeMillis(), 1);
|
||||
@ -108,10 +112,25 @@ public int compare(FileStatus o1, FileStatus o2) {
|
||||
});
|
||||
Assert.assertEquals("log1", harLogs[0].getPath().getName());
|
||||
Assert.assertEquals(3 * FILE_SIZE_INCREMENT, harLogs[0].getLen());
|
||||
Assert.assertEquals(
|
||||
new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.NONE),
|
||||
harLogs[0].getPermission());
|
||||
Assert.assertEquals(System.getProperty("user.name"),
|
||||
harLogs[0].getOwner());
|
||||
Assert.assertEquals("log2", harLogs[1].getPath().getName());
|
||||
Assert.assertEquals(4 * FILE_SIZE_INCREMENT, harLogs[1].getLen());
|
||||
Assert.assertEquals(
|
||||
new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.NONE),
|
||||
harLogs[1].getPermission());
|
||||
Assert.assertEquals(System.getProperty("user.name"),
|
||||
harLogs[1].getOwner());
|
||||
Assert.assertEquals("log3", harLogs[2].getPath().getName());
|
||||
Assert.assertEquals(2 * FILE_SIZE_INCREMENT, harLogs[2].getLen());
|
||||
Assert.assertEquals(
|
||||
new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.NONE),
|
||||
harLogs[2].getPermission());
|
||||
Assert.assertEquals(System.getProperty("user.name"),
|
||||
harLogs[2].getOwner());
|
||||
Assert.assertEquals(0, fs.listStatus(workingDir).length);
|
||||
} finally {
|
||||
if (yarnCluster != null) {
|
||||
|
Loading…
Reference in New Issue
Block a user