YARN-2185. Use pipes when localizing archives. Contributed by Miklos Szegedi

This commit is contained in:
Jason Lowe 2018-01-31 09:39:43 -06:00
parent 1453a04e92
commit 7288b08330
4 changed files with 466 additions and 100 deletions

View File

@ -20,27 +20,35 @@
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.URI; import java.net.URI;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.nio.file.AccessDeniedException; import java.nio.file.AccessDeniedException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.jar.Attributes; import java.util.jar.Attributes;
import java.util.jar.JarOutputStream; import java.util.jar.JarOutputStream;
import java.util.jar.Manifest; import java.util.jar.Manifest;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipFile; import java.util.zip.ZipFile;
import java.util.zip.ZipInputStream;
import org.apache.commons.collections.map.CaseInsensitiveMap; import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@ -74,6 +82,11 @@ public class FileUtil {
* */ * */
public static final int SYMLINK_NO_PRIVILEGE = 2; public static final int SYMLINK_NO_PRIVILEGE = 2;
/**
* Buffer size for copy the content of compressed file to new file.
*/
private static final int BUFFER_SIZE = 8_192;
/** /**
* convert an array of FileStatus to an array of Path * convert an array of FileStatus to an array of Path
* *
@ -525,6 +538,22 @@ public static String makeShellPath(File file) throws IOException {
return makeShellPath(file, false); return makeShellPath(file, false);
} }
/**
* Convert a os-native filename to a path that works for the shell
* and avoids script injection attacks.
* @param file The filename to convert
* @return The unix pathname
* @throws IOException on windows, there can be problems with the subprocess
*/
public static String makeSecureShellPath(File file) throws IOException {
if (Shell.WINDOWS) {
// Currently it is never called, but it might be helpful in the future.
throw new UnsupportedOperationException("Not implemented for Windows");
} else {
return makeShellPath(file, false).replace("'", "'\\''");
}
}
/** /**
* Convert a os-native filename to a path that works for the shell. * Convert a os-native filename to a path that works for the shell.
* @param file The filename to convert * @param file The filename to convert
@ -576,11 +605,48 @@ public static long getDU(File dir) {
} }
/** /**
* Given a File input it will unzip the file in a the unzip directory * Given a stream input it will unzip the it in the unzip directory.
* passed as the second parameter
* @param inputStream The zip file as input
* @param toDir The unzip directory where to unzip the zip file.
* @throws IOException an exception occurred
*/
public static void unZip(InputStream inputStream, File toDir)
throws IOException {
try (ZipInputStream zip = new ZipInputStream(inputStream)) {
int numOfFailedLastModifiedSet = 0;
for(ZipEntry entry = zip.getNextEntry();
entry != null;
entry = zip.getNextEntry()) {
if (!entry.isDirectory()) {
File file = new File(toDir, entry.getName());
File parent = file.getParentFile();
if (!parent.mkdirs() &&
!parent.isDirectory()) {
throw new IOException("Mkdirs failed to create " +
parent.getAbsolutePath());
}
try (OutputStream out = new FileOutputStream(file)) {
IOUtils.copyBytes(zip, out, BUFFER_SIZE);
}
if (!file.setLastModified(entry.getTime())) {
numOfFailedLastModifiedSet++;
}
}
}
if (numOfFailedLastModifiedSet > 0) {
LOG.warn("Could not set last modfied time for {} file(s)",
numOfFailedLastModifiedSet);
}
}
}
/**
* Given a File input it will unzip it in the unzip directory.
* passed as the second parameter * passed as the second parameter
* @param inFile The zip file as input * @param inFile The zip file as input
* @param unzipDir The unzip directory where to unzip the zip file. * @param unzipDir The unzip directory where to unzip the zip file.
* @throws IOException * @throws IOException An I/O exception has occurred
*/ */
public static void unZip(File inFile, File unzipDir) throws IOException { public static void unZip(File inFile, File unzipDir) throws IOException {
Enumeration<? extends ZipEntry> entries; Enumeration<? extends ZipEntry> entries;
@ -620,6 +686,138 @@ public static void unZip(File inFile, File unzipDir) throws IOException {
} }
} }
/**
* Run a command and send the contents of an input stream to it.
* @param inputStream Input stream to forward to the shell command
* @param command shell command to run
* @throws IOException read or write failed
* @throws InterruptedException command interrupted
* @throws ExecutionException task submit failed
*/
private static void runCommandOnStream(
InputStream inputStream, String command)
throws IOException, InterruptedException, ExecutionException {
ExecutorService executor = null;
ProcessBuilder builder = new ProcessBuilder();
builder.command(
Shell.WINDOWS ? "cmd" : "bash",
Shell.WINDOWS ? "/c" : "-c",
command);
Process process = builder.start();
int exitCode;
try {
// Consume stdout and stderr, to avoid blocking the command
executor = Executors.newFixedThreadPool(2);
Future output = executor.submit(() -> {
try {
// Read until the output stream receives an EOF and closed.
if (LOG.isDebugEnabled()) {
// Log directly to avoid out of memory errors
try (BufferedReader reader =
new BufferedReader(
new InputStreamReader(process.getInputStream(),
Charset.forName("UTF-8")))) {
String line;
while((line = reader.readLine()) != null) {
LOG.debug(line);
}
}
} else {
org.apache.commons.io.IOUtils.copy(
process.getInputStream(),
new IOUtils.NullOutputStream());
}
} catch (IOException e) {
LOG.debug(e.getMessage());
}
});
Future error = executor.submit(() -> {
try {
// Read until the error stream receives an EOF and closed.
if (LOG.isDebugEnabled()) {
// Log directly to avoid out of memory errors
try (BufferedReader reader =
new BufferedReader(
new InputStreamReader(process.getErrorStream(),
Charset.forName("UTF-8")))) {
String line;
while((line = reader.readLine()) != null) {
LOG.debug(line);
}
}
} else {
org.apache.commons.io.IOUtils.copy(
process.getErrorStream(),
new IOUtils.NullOutputStream());
}
} catch (IOException e) {
LOG.debug(e.getMessage());
}
});
// Pass the input stream to the command to process
try {
org.apache.commons.io.IOUtils.copy(
inputStream, process.getOutputStream());
} finally {
process.getOutputStream().close();
}
// Wait for both stdout and stderr futures to finish
error.get();
output.get();
} finally {
// Clean up the threads
if (executor != null) {
executor.shutdown();
}
// Wait to avoid leaking the child process
exitCode = process.waitFor();
}
if (exitCode != 0) {
throw new IOException(
String.format(
"Error executing command. %s " +
"Process exited with exit code %d.",
command, exitCode));
}
}
/**
* Given a Tar File as input it will untar the file in a the untar directory
* passed as the second parameter
*
* This utility will untar ".tar" files and ".tar.gz","tgz" files.
*
* @param inputStream The tar file as input.
* @param untarDir The untar directory where to untar the tar file.
* @param gzipped The input stream is gzipped
* TODO Use magic number and PusbackInputStream to identify
* @throws IOException an exception occurred
* @throws InterruptedException command interrupted
* @throws ExecutionException task submit failed
*/
public static void unTar(InputStream inputStream, File untarDir,
boolean gzipped)
throws IOException, InterruptedException, ExecutionException {
if (!untarDir.mkdirs()) {
if (!untarDir.isDirectory()) {
throw new IOException("Mkdirs failed to create " + untarDir);
}
}
if(Shell.WINDOWS) {
// Tar is not native to Windows. Use simple Java based implementation for
// tests and simple tar archives
unTarUsingJava(inputStream, untarDir, gzipped);
} else {
// spawn tar utility to untar archive for full fledged unix behavior such
// as resolving symlinks in tar archives
unTarUsingTar(inputStream, untarDir, gzipped);
}
}
/** /**
* Given a Tar File as input it will untar the file in a the untar directory * Given a Tar File as input it will untar the file in a the untar directory
* passed as the second parameter * passed as the second parameter
@ -650,23 +848,41 @@ public static void unTar(File inFile, File untarDir) throws IOException {
} }
} }
private static void unTarUsingTar(InputStream inputStream, File untarDir,
boolean gzipped)
throws IOException, InterruptedException, ExecutionException {
StringBuilder untarCommand = new StringBuilder();
if (gzipped) {
untarCommand.append("gzip -dc | (");
}
untarCommand.append("cd '");
untarCommand.append(FileUtil.makeSecureShellPath(untarDir));
untarCommand.append("' && ");
untarCommand.append("tar -x ");
if (gzipped) {
untarCommand.append(")");
}
runCommandOnStream(inputStream, untarCommand.toString());
}
private static void unTarUsingTar(File inFile, File untarDir, private static void unTarUsingTar(File inFile, File untarDir,
boolean gzipped) throws IOException { boolean gzipped) throws IOException {
StringBuffer untarCommand = new StringBuffer(); StringBuffer untarCommand = new StringBuffer();
if (gzipped) { if (gzipped) {
untarCommand.append(" gzip -dc '"); untarCommand.append(" gzip -dc '");
untarCommand.append(FileUtil.makeShellPath(inFile)); untarCommand.append(FileUtil.makeSecureShellPath(inFile));
untarCommand.append("' | ("); untarCommand.append("' | (");
} }
untarCommand.append("cd '"); untarCommand.append("cd '");
untarCommand.append(FileUtil.makeShellPath(untarDir)); untarCommand.append(FileUtil.makeSecureShellPath(untarDir));
untarCommand.append("' ; "); untarCommand.append("' && ");
untarCommand.append("tar -xf "); untarCommand.append("tar -xf ");
if (gzipped) { if (gzipped) {
untarCommand.append(" -)"); untarCommand.append(" -)");
} else { } else {
untarCommand.append(FileUtil.makeShellPath(inFile)); untarCommand.append(FileUtil.makeSecureShellPath(inFile));
} }
String[] shellCmd = { "bash", "-c", untarCommand.toString() }; String[] shellCmd = { "bash", "-c", untarCommand.toString() };
ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd); ShellCommandExecutor shexec = new ShellCommandExecutor(shellCmd);
@ -701,6 +917,29 @@ private static void unTarUsingJava(File inFile, File untarDir,
} }
} }
private static void unTarUsingJava(InputStream inputStream, File untarDir,
boolean gzipped) throws IOException {
TarArchiveInputStream tis = null;
try {
if (gzipped) {
inputStream = new BufferedInputStream(new GZIPInputStream(
inputStream));
} else {
inputStream =
new BufferedInputStream(inputStream);
}
tis = new TarArchiveInputStream(inputStream);
for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null;) {
unpackEntries(tis, entry, untarDir);
entry = tis.getNextTarEntry();
}
} finally {
IOUtils.cleanupWithLogger(LOG, tis, inputStream);
}
}
private static void unpackEntries(TarArchiveInputStream tis, private static void unpackEntries(TarArchiveInputStream tis,
TarArchiveEntry entry, File outputDir) throws IOException { TarArchiveEntry entry, File outputDir) throws IOException {
if (entry.isDirectory()) { if (entry.isDirectory()) {

View File

@ -34,9 +34,11 @@
import java.util.List; import java.util.List;
import java.util.jar.JarEntry; import java.util.jar.JarEntry;
import java.util.jar.JarFile; import java.util.jar.JarFile;
import java.util.jar.JarInputStream;
import java.util.jar.Manifest; import java.util.jar.Manifest;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.commons.io.input.TeeInputStream;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
@ -94,6 +96,69 @@ public static void unJar(File jarFile, File toDir) throws IOException {
unJar(jarFile, toDir, MATCH_ANY); unJar(jarFile, toDir, MATCH_ANY);
} }
/**
* Unpack matching files from a jar. Entries inside the jar that do
* not match the given pattern will be skipped.
*
* @param inputStream the jar stream to unpack
* @param toDir the destination directory into which to unpack the jar
* @param unpackRegex the pattern to match jar entries against
*
* @throws IOException if an I/O error has occurred or toDir
* cannot be created and does not already exist
*/
public static void unJar(InputStream inputStream, File toDir,
Pattern unpackRegex)
throws IOException {
try (JarInputStream jar = new JarInputStream(inputStream)) {
int numOfFailedLastModifiedSet = 0;
for (JarEntry entry = jar.getNextJarEntry();
entry != null;
entry = jar.getNextJarEntry()) {
if (!entry.isDirectory() &&
unpackRegex.matcher(entry.getName()).matches()) {
File file = new File(toDir, entry.getName());
ensureDirectory(file.getParentFile());
try (OutputStream out = new FileOutputStream(file)) {
IOUtils.copyBytes(jar, out, BUFFER_SIZE);
}
if (!file.setLastModified(entry.getTime())) {
numOfFailedLastModifiedSet++;
}
}
}
if (numOfFailedLastModifiedSet > 0) {
LOG.warn("Could not set last modfied time for {} file(s)",
numOfFailedLastModifiedSet);
}
}
}
/**
* Unpack matching files from a jar. Entries inside the jar that do
* not match the given pattern will be skipped. Keep also a copy
* of the entire jar in the same directory for backward compatibility.
* TODO remove this feature in a new release and do only unJar
*
* @param inputStream the jar stream to unpack
* @param toDir the destination directory into which to unpack the jar
* @param unpackRegex the pattern to match jar entries against
*
* @throws IOException if an I/O error has occurred or toDir
* cannot be created and does not already exist
*/
@Deprecated
public static void unJarAndSave(InputStream inputStream, File toDir,
String name, Pattern unpackRegex)
throws IOException{
File file = new File(toDir, name);
ensureDirectory(toDir);
try (OutputStream jar = new FileOutputStream(file);
TeeInputStream teeInputStream = new TeeInputStream(inputStream, jar)) {
unJar(teeInputStream, toDir, unpackRegex);
}
}
/** /**
* Unpack matching files from a jar. Entries inside the jar that do * Unpack matching files from a jar. Entries inside the jar that do
* not match the given pattern will be skipped. * not match the given pattern will be skipped.

View File

@ -21,6 +21,8 @@
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -29,6 +31,7 @@
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@ -54,6 +57,7 @@
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache; import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import org.apache.hadoop.yarn.exceptions.YarnException;
/** /**
* Download a single URL to the local disk. * Download a single URL to the local disk.
@ -247,9 +251,21 @@ private static FileStatus getFileStatus(final FileSystem fs, final Path path,
} }
} }
private Path copy(Path sCopy, Path dstdir) throws IOException { /**
* Localize files.
* @param destination destination directory
* @throws IOException cannot read or write file
* @throws YarnException subcommand returned an error
*/
private void verifyAndCopy(Path destination)
throws IOException, YarnException {
final Path sCopy;
try {
sCopy = resource.getResource().toPath();
} catch (URISyntaxException e) {
throw new IOException("Invalid resource", e);
}
FileSystem sourceFs = sCopy.getFileSystem(conf); FileSystem sourceFs = sCopy.getFileSystem(conf);
Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName());
FileStatus sStat = sourceFs.getFileStatus(sCopy); FileStatus sStat = sourceFs.getFileStatus(sCopy);
if (sStat.getModificationTime() != resource.getTimestamp()) { if (sStat.getModificationTime() != resource.getTimestamp()) {
throw new IOException("Resource " + sCopy + throw new IOException("Resource " + sCopy +
@ -264,82 +280,108 @@ private Path copy(Path sCopy, Path dstdir) throws IOException {
} }
} }
FileUtil.copy(sourceFs, sStat, FileSystem.getLocal(conf), dCopy, false, downloadAndUnpack(sCopy, destination);
true, conf);
return dCopy;
} }
private long unpack(File localrsrc, File dst) throws IOException { /**
switch (resource.getType()) { * Copy source path to destination with localization rules.
case ARCHIVE: { * @param source source path to copy. Typically HDFS
String lowerDst = StringUtils.toLowerCase(dst.getName()); * @param destination destination path. Typically local filesystem
if (lowerDst.endsWith(".jar")) { * @exception YarnException Any error has occurred
RunJar.unJar(localrsrc, dst); */
} else if (lowerDst.endsWith(".zip")) { private void downloadAndUnpack(Path source, Path destination)
FileUtil.unZip(localrsrc, dst); throws YarnException {
} else if (lowerDst.endsWith(".tar.gz") || try {
lowerDst.endsWith(".tgz") || FileSystem sourceFileSystem = source.getFileSystem(conf);
lowerDst.endsWith(".tar")) { FileSystem destinationFileSystem = destination.getFileSystem(conf);
FileUtil.unTar(localrsrc, dst); if (sourceFileSystem.getFileStatus(source).isDirectory()) {
FileUtil.copy(
sourceFileSystem, source,
destinationFileSystem, destination, false,
true, conf);
} else { } else {
LOG.warn("Cannot unpack " + localrsrc); unpack(source, destination, sourceFileSystem, destinationFileSystem);
if (!localrsrc.renameTo(dst)) {
throw new IOException("Unable to rename file: [" + localrsrc
+ "] to [" + dst + "]");
}
} }
} catch (Exception e) {
throw new YarnException("Download and unpack failed", e);
} }
break; }
case PATTERN: {
/**
* Do the localization action on the input stream.
* We use the deprecated method RunJar.unJarAndSave for compatibility reasons.
* We should use the more efficient RunJar.unJar in the future.
* @param source Source path
* @param destination Destination pth
* @param sourceFileSystem Source filesystem
* @param destinationFileSystem Destination filesystem
* @throws IOException Could not read or write stream
* @throws InterruptedException Operation interrupted by caller
* @throws ExecutionException Could not create thread pool execution
*/
@SuppressWarnings("deprecation")
private void unpack(Path source, Path destination,
FileSystem sourceFileSystem,
FileSystem destinationFileSystem)
throws IOException, InterruptedException, ExecutionException {
try (InputStream inputStream = sourceFileSystem.open(source)) {
File dst = new File(destination.toUri());
String lowerDst = StringUtils.toLowerCase(dst.getName()); String lowerDst = StringUtils.toLowerCase(dst.getName());
if (lowerDst.endsWith(".jar")) { switch (resource.getType()) {
String p = resource.getPattern(); case ARCHIVE:
RunJar.unJar(localrsrc, dst, if (lowerDst.endsWith(".jar")) {
p == null ? RunJar.MATCH_ANY : Pattern.compile(p)); RunJar.unJar(inputStream, dst, RunJar.MATCH_ANY);
File newDst = new File(dst, dst.getName()); } else if (lowerDst.endsWith(".zip")) {
if (!dst.exists() && !dst.mkdir()) { FileUtil.unZip(inputStream, dst);
throw new IOException("Unable to create directory: [" + dst + "]"); } else if (lowerDst.endsWith(".tar.gz") ||
lowerDst.endsWith(".tgz") ||
lowerDst.endsWith(".tar")) {
FileUtil.unTar(inputStream, dst, lowerDst.endsWith("gz"));
} else {
LOG.warn("Cannot unpack " + source);
try (OutputStream outputStream =
destinationFileSystem.create(destination, true)) {
IOUtils.copy(inputStream, outputStream);
}
} }
if (!localrsrc.renameTo(newDst)) { break;
throw new IOException("Unable to rename file: [" + localrsrc case PATTERN:
+ "] to [" + newDst + "]"); if (lowerDst.endsWith(".jar")) {
String p = resource.getPattern();
if (!dst.exists() && !dst.mkdir()) {
throw new IOException("Unable to create directory: [" + dst + "]");
}
RunJar.unJarAndSave(inputStream, dst, source.getName(),
p == null ? RunJar.MATCH_ANY : Pattern.compile(p));
} else if (lowerDst.endsWith(".zip")) {
LOG.warn("Treating [" + source + "] as an archive even though it " +
"was specified as PATTERN");
FileUtil.unZip(inputStream, dst);
} else if (lowerDst.endsWith(".tar.gz") ||
lowerDst.endsWith(".tgz") ||
lowerDst.endsWith(".tar")) {
LOG.warn("Treating [" + source + "] as an archive even though it " +
"was specified as PATTERN");
FileUtil.unTar(inputStream, dst, lowerDst.endsWith("gz"));
} else {
LOG.warn("Cannot unpack " + source);
try (OutputStream outputStream =
destinationFileSystem.create(destination, true)) {
IOUtils.copy(inputStream, outputStream);
}
} }
} else if (lowerDst.endsWith(".zip")) { break;
LOG.warn("Treating [" + localrsrc + "] as an archive even though it " + case FILE:
"was specified as PATTERN"); default:
FileUtil.unZip(localrsrc, dst); try (OutputStream outputStream =
} else if (lowerDst.endsWith(".tar.gz") || destinationFileSystem.create(destination, true)) {
lowerDst.endsWith(".tgz") || IOUtils.copy(inputStream, outputStream);
lowerDst.endsWith(".tar")) {
LOG.warn("Treating [" + localrsrc + "] as an archive even though it " +
"was specified as PATTERN");
FileUtil.unTar(localrsrc, dst);
} else {
LOG.warn("Cannot unpack " + localrsrc);
if (!localrsrc.renameTo(dst)) {
throw new IOException("Unable to rename file: [" + localrsrc
+ "] to [" + dst + "]");
} }
break;
} }
// TODO Should calculate here before returning
//return FileUtil.getDU(destDir);
} }
break;
case FILE:
default:
if (!localrsrc.renameTo(dst)) {
throw new IOException("Unable to rename file: [" + localrsrc
+ "] to [" + dst + "]");
}
break;
}
if(localrsrc.isFile()){
try {
files.delete(new Path(localrsrc.toString()), false);
} catch (IOException ignore) {
}
}
return 0;
// TODO Should calculate here before returning
//return FileUtil.getDU(destDir);
} }
@Override @Override
@ -352,27 +394,37 @@ public Path call() throws Exception {
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Starting to download " + sCopy); LOG.debug(String.format("Starting to download %s %s %s",
sCopy,
resource.getType(),
resource.getPattern()));
} }
createDir(destDirPath, cachePerms); final Path destinationTmp = new Path(destDirPath + "_tmp");
final Path dst_work = new Path(destDirPath + "_tmp"); createDir(destinationTmp, PRIVATE_DIR_PERMS);
createDir(dst_work, cachePerms); Path dFinal =
Path dFinal = files.makeQualified(new Path(dst_work, sCopy.getName())); files.makeQualified(new Path(destinationTmp, sCopy.getName()));
try { try {
Path dTmp = null == userUgi ? files.makeQualified(copy(sCopy, dst_work)) if (userUgi == null) {
: userUgi.doAs(new PrivilegedExceptionAction<Path>() { verifyAndCopy(dFinal);
public Path run() throws Exception { } else {
return files.makeQualified(copy(sCopy, dst_work)); userUgi.doAs(new PrivilegedExceptionAction<Void>() {
}; @Override
}); public Void run() throws Exception {
unpack(new File(dTmp.toUri()), new File(dFinal.toUri())); verifyAndCopy(dFinal);
changePermissions(dFinal.getFileSystem(conf), dFinal); return null;
files.rename(dst_work, destDirPath, Rename.OVERWRITE); }
});
}
Path destinationTmpfilesQualified = files.makeQualified(destinationTmp);
changePermissions(
destinationTmpfilesQualified.getFileSystem(conf),
destinationTmpfilesQualified);
files.rename(destinationTmp, destDirPath, Rename.OVERWRITE);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("File has been downloaded to " + LOG.debug(String.format("File has been downloaded to %s from %s",
new Path(destDirPath, sCopy.getName())); new Path(destDirPath, sCopy.getName()), sCopy));
} }
} catch (Exception e) { } catch (Exception e) {
try { try {
@ -382,7 +434,7 @@ public Path run() throws Exception {
throw e; throw e;
} finally { } finally {
try { try {
files.delete(dst_work, true); files.delete(destinationTmp, true);
} catch (FileNotFoundException ignore) { } catch (FileNotFoundException ignore) {
} }
conf = null; conf = null;

View File

@ -82,6 +82,9 @@
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache; import com.google.common.cache.LoadingCache;
/**
* Unit test for the FSDownload class.
*/
public class TestFSDownload { public class TestFSDownload {
private static final Log LOG = LogFactory.getLog(TestFSDownload.class); private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
@ -90,7 +93,8 @@ public class TestFSDownload {
private enum TEST_FILE_TYPE { private enum TEST_FILE_TYPE {
TAR, JAR, ZIP, TGZ TAR, JAR, ZIP, TGZ
}; };
private Configuration conf = new Configuration();
@AfterClass @AfterClass
public static void deleteTestDir() throws IOException { public static void deleteTestDir() throws IOException {
FileContext fs = FileContext.getLocalFSFileContext(); FileContext fs = FileContext.getLocalFSFileContext();
@ -132,6 +136,18 @@ static LocalResource createJar(FileContext files, Path p,
FileOutputStream stream = new FileOutputStream(jarFile); FileOutputStream stream = new FileOutputStream(jarFile);
LOG.info("Create jar out stream "); LOG.info("Create jar out stream ");
JarOutputStream out = new JarOutputStream(stream, new Manifest()); JarOutputStream out = new JarOutputStream(stream, new Manifest());
ZipEntry entry = new ZipEntry("classes/1.class");
out.putNextEntry(entry);
out.write(1);
out.write(2);
out.write(3);
out.closeEntry();
ZipEntry entry2 = new ZipEntry("classes/2.class");
out.putNextEntry(entry2);
out.write(1);
out.write(2);
out.write(3);
out.closeEntry();
LOG.info("Done writing jar stream "); LOG.info("Done writing jar stream ");
out.close(); out.close();
LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
@ -256,7 +272,6 @@ static LocalResource createZipFile(FileContext files, Path p, int len,
@Test (timeout=10000) @Test (timeout=10000)
public void testDownloadBadPublic() throws IOException, URISyntaxException, public void testDownloadBadPublic() throws IOException, URISyntaxException,
InterruptedException { InterruptedException {
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
FileContext files = FileContext.getLocalFSFileContext(conf); FileContext files = FileContext.getLocalFSFileContext(conf);
final Path basedir = files.makeQualified(new Path("target", final Path basedir = files.makeQualified(new Path("target",
@ -307,7 +322,6 @@ public void testDownloadBadPublic() throws IOException, URISyntaxException,
@Test (timeout=60000) @Test (timeout=60000)
public void testDownloadPublicWithStatCache() throws IOException, public void testDownloadPublicWithStatCache() throws IOException,
URISyntaxException, InterruptedException, ExecutionException { URISyntaxException, InterruptedException, ExecutionException {
final Configuration conf = new Configuration();
FileContext files = FileContext.getLocalFSFileContext(conf); FileContext files = FileContext.getLocalFSFileContext(conf);
Path basedir = files.makeQualified(new Path("target", Path basedir = files.makeQualified(new Path("target",
TestFSDownload.class.getSimpleName())); TestFSDownload.class.getSimpleName()));
@ -382,7 +396,6 @@ public Boolean call() throws IOException {
@Test (timeout=10000) @Test (timeout=10000)
public void testDownload() throws IOException, URISyntaxException, public void testDownload() throws IOException, URISyntaxException,
InterruptedException { InterruptedException {
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
FileContext files = FileContext.getLocalFSFileContext(conf); FileContext files = FileContext.getLocalFSFileContext(conf);
final Path basedir = files.makeQualified(new Path("target", final Path basedir = files.makeQualified(new Path("target",
@ -438,7 +451,7 @@ public void testDownload() throws IOException, URISyntaxException,
FileStatus status = files.getFileStatus(localized.getParent()); FileStatus status = files.getFileStatus(localized.getParent());
FsPermission perm = status.getPermission(); FsPermission perm = status.getPermission();
assertEquals("Cache directory permissions are incorrect", assertEquals("Cache directory permissions are incorrect",
new FsPermission((short)0755), perm); new FsPermission((short)0700), perm);
status = files.getFileStatus(localized); status = files.getFileStatus(localized);
perm = status.getPermission(); perm = status.getPermission();
@ -455,7 +468,6 @@ public void testDownload() throws IOException, URISyntaxException,
private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException, private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException,
URISyntaxException, InterruptedException{ URISyntaxException, InterruptedException{
Configuration conf = new Configuration();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
FileContext files = FileContext.getLocalFSFileContext(conf); FileContext files = FileContext.getLocalFSFileContext(conf);
final Path basedir = files.makeQualified(new Path("target", final Path basedir = files.makeQualified(new Path("target",
@ -530,7 +542,7 @@ private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException,
} }
} }
@Test (timeout=10000) @Test (timeout=10000)
public void testDownloadArchive() throws IOException, URISyntaxException, public void testDownloadArchive() throws IOException, URISyntaxException,
InterruptedException { InterruptedException {
downloadWithFileType(TEST_FILE_TYPE.TAR); downloadWithFileType(TEST_FILE_TYPE.TAR);
@ -542,7 +554,7 @@ public void testDownloadPatternJar() throws IOException, URISyntaxException,
downloadWithFileType(TEST_FILE_TYPE.JAR); downloadWithFileType(TEST_FILE_TYPE.JAR);
} }
@Test (timeout=10000) @Test (timeout=10000)
public void testDownloadArchiveZip() throws IOException, URISyntaxException, public void testDownloadArchiveZip() throws IOException, URISyntaxException,
InterruptedException { InterruptedException {
downloadWithFileType(TEST_FILE_TYPE.ZIP); downloadWithFileType(TEST_FILE_TYPE.ZIP);
@ -603,7 +615,6 @@ private void verifyPermsRecursively(FileSystem fs,
@Test (timeout=10000) @Test (timeout=10000)
public void testDirDownload() throws IOException, InterruptedException { public void testDirDownload() throws IOException, InterruptedException {
Configuration conf = new Configuration();
FileContext files = FileContext.getLocalFSFileContext(conf); FileContext files = FileContext.getLocalFSFileContext(conf);
final Path basedir = files.makeQualified(new Path("target", final Path basedir = files.makeQualified(new Path("target",
TestFSDownload.class.getSimpleName())); TestFSDownload.class.getSimpleName()));
@ -668,7 +679,6 @@ public void testDirDownload() throws IOException, InterruptedException {
@Test (timeout=10000) @Test (timeout=10000)
public void testUniqueDestinationPath() throws Exception { public void testUniqueDestinationPath() throws Exception {
Configuration conf = new Configuration();
FileContext files = FileContext.getLocalFSFileContext(conf); FileContext files = FileContext.getLocalFSFileContext(conf);
final Path basedir = files.makeQualified(new Path("target", final Path basedir = files.makeQualified(new Path("target",
TestFSDownload.class.getSimpleName())); TestFSDownload.class.getSimpleName()));