YARN-1771. Reduce the number of NameNode operations during localization of
public resources using a cache. Contributed by Sangjin Lee git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1577391 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
@ -339,11 +339,11 @@ public class FileUtil {
/** Copy files between FileSystems. */
private static boolean copy(FileSystem srcFS, FileStatus srcStatus,
FileSystem dstFS, Path dst,
boolean deleteSource,
boolean overwrite,
Configuration conf) throws IOException {
public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
FileSystem dstFS, Path dst,
boolean deleteSource,
boolean overwrite,
Configuration conf) throws IOException {
Path src = srcStatus.getPath();
dst = checkDest(src.getName(), dstFS, dst, overwrite);
if (srcStatus.isDirectory()) {
@ -18,19 +18,26 @@
package org.apache.hadoop.mapred;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
@ -38,7 +45,6 @@ import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -89,8 +95,26 @@ public class TestLocalDistributedCacheManager {
public void cleanup() throws Exception {
* Mock input stream based on a byte array so that it can be used by a
* FSDataInputStream.
private static class MockInputStream extends ByteArrayInputStream
implements Seekable, PositionedReadable {
public MockInputStream(byte[] buf) {
// empty implementation for unused methods
public int read(long position, byte[] buffer, int offset, int length) { return -1; }
public void readFully(long position, byte[] buffer, int offset, int length) {}
public void readFully(long position, byte[] buffer) {}
public void seek(long position) {}
public long getPos() { return 0; }
public boolean seekToNewSource(long targetPos) { return false; }
public void testDownload() throws Exception {
JobConf conf = new JobConf();
@ -123,28 +147,22 @@ public class TestLocalDistributedCacheManager {
doAnswer(new Answer() {
final FSDataInputStream in =
new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes()));
when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
public Object answer(InvocationOnMock args) throws Throwable {
//Ignored boolean overwrite = (Boolean) args.getArguments()[0];
Path src = (Path)args.getArguments()[1];
Path dst = (Path)args.getArguments()[2];
if("file.txt".equals(src.getName())) {
File f = new File(dst.toUri().getPath());
FileWriter writer = new FileWriter(f);
try {
writer.append("This is a test file\n");
} finally {
if(writer != null) writer.close();
public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
Path src = (Path)args.getArguments()[0];
if ("file.txt".equals(src.getName())) {
return in;
} else {
throw new FileNotFoundException(src+" not supported by mocking");
return null;
}).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class));
DistributedCache.addCacheFile(file, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101");
conf.set(MRJobConfig.CACHE_FILES_SIZES, "201");
@ -159,8 +177,7 @@ public class TestLocalDistributedCacheManager {
public void testEmptyDownload() throws Exception {
JobConf conf = new JobConf();
@ -184,16 +201,16 @@ public class TestLocalDistributedCacheManager {
throw new FileNotFoundException(p+" not supported by mocking");
doAnswer(new Answer() {
when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
public Object answer(InvocationOnMock args) throws Throwable {
//Ignored boolean overwrite = (Boolean) args.getArguments()[0];
Path src = (Path)args.getArguments()[1];
public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
Path src = (Path)args.getArguments()[0];
throw new FileNotFoundException(src+" not supported by mocking");
}).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class));
conf.set(MRJobConfig.CACHE_FILES, "");
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
@ -203,9 +220,8 @@ public class TestLocalDistributedCacheManager {
public void testDuplicateDownload() throws Exception {
JobConf conf = new JobConf();
@ -238,28 +254,22 @@ public class TestLocalDistributedCacheManager {
doAnswer(new Answer() {
final FSDataInputStream in =
new FSDataInputStream(new MockInputStream("This is a test file\n".getBytes()));
when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new Answer<FSDataInputStream>() {
public Object answer(InvocationOnMock args) throws Throwable {
//Ignored boolean overwrite = (Boolean) args.getArguments()[0];
Path src = (Path)args.getArguments()[1];
Path dst = (Path)args.getArguments()[2];
if("file.txt".equals(src.getName())) {
File f = new File(dst.toUri().getPath());
FileWriter writer = new FileWriter(f);
try {
writer.append("This is a test file\n");
} finally {
if(writer != null) writer.close();
public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
Path src = (Path)args.getArguments()[0];
if ("file.txt".equals(src.getName())) {
return in;
} else {
throw new FileNotFoundException(src+" not supported by mocking");
return null;
}).when(mockfs).copyToLocalFile(anyBoolean(), any(Path.class), any(Path.class));
DistributedCache.addCacheFile(file, conf);
DistributedCache.addCacheFile(file, conf);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101");
@ -288,6 +288,9 @@ Release 2.4.0 - UNRELEASED
expose analogous getApplication(s)/Attempt(s)/Container(s) APIs. (Mayank
Bansal via zjshen)
YARN-1771. Reduce the number of NameNode operations during localization of
public resources using a cache. (Sangjin Lee via cdouglas)
@ -24,6 +24,8 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@ -43,6 +45,11 @@ import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.Futures;
* Download a single URL to the local disk.
@ -56,6 +63,7 @@ public class FSDownload implements Callable<Path> {
private final UserGroupInformation userUgi;
private Configuration conf;
private LocalResource resource;
private final LoadingCache<Path,Future<FileStatus>> statCache;
/** The local FS dir path under which this resource is to be localized to */
private Path destDirPath;
@ -71,11 +79,18 @@ public class FSDownload implements Callable<Path> {
public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
Path destDirPath, LocalResource resource) {
this(files, ugi, conf, destDirPath, resource, null);
public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
Path destDirPath, LocalResource resource,
LoadingCache<Path,Future<FileStatus>> statCache) {
this.conf = conf;
this.destDirPath = destDirPath;
this.files = files;
this.userUgi = ugi;
this.resource = resource;
this.statCache = statCache;
LocalResource getResource() {
@ -90,28 +105,43 @@ public class FSDownload implements Callable<Path> {
* Returns a boolean to denote whether a cache file is visible to all(public)
* or not
* @param conf
* @param uri
* @return true if the path in the uri is visible to all, false otherwise
* @throws IOException
* Creates the cache loader for the status loading cache. This should be used
* to create an instance of the status cache that is passed into the
* FSDownload constructor.
private static boolean isPublic(FileSystem fs, Path current) throws IOException {
current = fs.makeQualified(current);
//the leaf level file should be readable by others
if (!checkPublicPermsForAll(fs, current, FsAction.READ_EXECUTE, FsAction.READ)) {
return false;
return ancestorsHaveExecutePermissions(fs, current.getParent());
public static CacheLoader<Path,Future<FileStatus>>
createStatusCacheLoader(final Configuration conf) {
return new CacheLoader<Path,Future<FileStatus>>() {
public Future<FileStatus> load(Path path) {
try {
FileSystem fs = path.getFileSystem(conf);
return Futures.immediateFuture(fs.getFileStatus(path));
} catch (Throwable th) {
// report failures so it can be memoized
return Futures.immediateFailedFuture(th);
private static boolean checkPublicPermsForAll(FileSystem fs, Path current,
FsAction dir, FsAction file)
throws IOException {
return checkPublicPermsForAll(fs, fs.getFileStatus(current), dir, file);
* Returns a boolean to denote whether a cache file is visible to all (public)
* or not
* @return true if the path in the current path is visible to all, false
* otherwise
static boolean isPublic(FileSystem fs, Path current, FileStatus sStat,
LoadingCache<Path,Future<FileStatus>> statCache) throws IOException {
current = fs.makeQualified(current);
//the leaf level file should be readable by others
if (!checkPublicPermsForAll(fs, sStat, FsAction.READ_EXECUTE, FsAction.READ)) {
return false;
return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache);
private static boolean checkPublicPermsForAll(FileSystem fs,
FileStatus status, FsAction dir, FsAction file)
throws IOException {
@ -137,12 +167,13 @@ public class FSDownload implements Callable<Path> {
* permission set for all users (i.e. that other users can traverse
* the directory heirarchy to the given path)
private static boolean ancestorsHaveExecutePermissions(FileSystem fs, Path path)
throws IOException {
private static boolean ancestorsHaveExecutePermissions(FileSystem fs,
Path path, LoadingCache<Path,Future<FileStatus>> statCache)
throws IOException {
Path current = path;
while (current != null) {
//the subdirs in the path should have execute permissions for others
if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE)) {
if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) {
return false;
current = current.getParent();
@ -160,14 +191,46 @@ public class FSDownload implements Callable<Path> {
* @throws IOException
private static boolean checkPermissionOfOther(FileSystem fs, Path path,
FsAction action) throws IOException {
FileStatus status = fs.getFileStatus(path);
FsAction action, LoadingCache<Path,Future<FileStatus>> statCache)
throws IOException {
FileStatus status = getFileStatus(fs, path, statCache);
FsPermission perms = status.getPermission();
FsAction otherAction = perms.getOtherAction();
return otherAction.implies(action);
* Obtains the file status, first by checking the stat cache if it is
* available, and then by getting it explicitly from the filesystem. If we got
* the file status from the filesystem, it is added to the stat cache.
* The stat cache is expected to be managed by callers who provided it to
* FSDownload.
private static FileStatus getFileStatus(final FileSystem fs, final Path path,
LoadingCache<Path,Future<FileStatus>> statCache) throws IOException {
// if the stat cache does not exist, simply query the filesystem
if (statCache == null) {
return fs.getFileStatus(path);
try {
// get or load it from the cache
return statCache.get(path).get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
// the underlying exception should normally be IOException
if (cause instanceof IOException) {
throw (IOException)cause;
} else {
throw new IOException(cause);
} catch (InterruptedException e) { // should not happen
throw new IOException(e);
private Path copy(Path sCopy, Path dstdir) throws IOException {
FileSystem sourceFs = sCopy.getFileSystem(conf);
Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName());
@ -178,14 +241,15 @@ public class FSDownload implements Callable<Path> {
", was " + sStat.getModificationTime());
if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
if (!isPublic(sourceFs, sCopy)) {
if (!isPublic(sourceFs, sCopy, sStat, statCache)) {
throw new IOException("Resource " + sCopy +
" is not publicly accessable and as such cannot be part of the" +
" public cache.");
sourceFs.copyToLocalFile(sCopy, dCopy);
FileUtil.copy(sourceFs, sStat, FileSystem.getLocal(conf), dCopy, false,
true, conf);
return dCopy;
@ -21,27 +21,35 @@ package org.apache.hadoop.yarn.util;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import java.util.zip.GZIPOutputStream;
import junit.framework.Assert;
@ -64,10 +72,13 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.AfterClass;
import org.junit.Test;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
public class TestFSDownload {
private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
@ -88,6 +99,18 @@ public class TestFSDownload {
static LocalResource createFile(FileContext files, Path p, int len,
Random r, LocalResourceVisibility vis) throws IOException {
createFile(files, p, len, r);
LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
return ret;
static void createFile(FileContext files, Path p, int len, Random r)
throws IOException {
FSDataOutputStream out = null;
try {
byte[] bytes = new byte[len];
@ -97,13 +120,6 @@ public class TestFSDownload {
} finally {
if (out != null) out.close();
LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
return ret;
static LocalResource createJar(FileContext files, Path p,
@ -285,6 +301,76 @@ public class TestFSDownload {
@Test (timeout=60000)
public void testDownloadPublicWithStatCache() throws IOException,
URISyntaxException, InterruptedException, ExecutionException {
final Configuration conf = new Configuration();
FileContext files = FileContext.getLocalFSFileContext(conf);
Path basedir = files.makeQualified(new Path("target",
files.mkdir(basedir, null, true);
conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
int size = 512;
final ConcurrentMap<Path,AtomicInteger> counts =
new ConcurrentHashMap<Path,AtomicInteger>();
final CacheLoader<Path,Future<FileStatus>> loader =
final LoadingCache<Path,Future<FileStatus>> statCache =
CacheBuilder.newBuilder().build(new CacheLoader<Path,Future<FileStatus>>() {
public Future<FileStatus> load(Path path) throws Exception {
// increment the count
AtomicInteger count = counts.get(path);
if (count == null) {
count = new AtomicInteger(0);
AtomicInteger existing = counts.putIfAbsent(path, count);
if (existing != null) {
count = existing;
// use the default loader
return loader.load(path);
// test FSDownload.isPublic() concurrently
final int fileCount = 3;
List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
for (int i = 0; i < fileCount; i++) {
Random rand = new Random();
long sharedSeed = rand.nextLong();
System.out.println("SEED: " + sharedSeed);
final Path path = new Path(basedir, "test-file-" + i);
createFile(files, path, size, rand);
final FileSystem fs = path.getFileSystem(conf);
final FileStatus sStat = fs.getFileStatus(path);
tasks.add(new Callable<Boolean>() {
public Boolean call() throws IOException {
return FSDownload.isPublic(fs, path, sStat, statCache);
ExecutorService exec = Executors.newFixedThreadPool(fileCount);
try {
List<Future<Boolean>> futures = exec.invokeAll(tasks);
// files should be public
for (Future<Boolean> future: futures) {
// for each path exactly one file status call should be made
for (AtomicInteger count: counts.values()) {
assertSame(count.get(), 1);
} finally {
@Test (timeout=10000)
public void testDownload() throws IOException, URISyntaxException,
InterruptedException {
@ -18,20 +18,34 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
import java.util.concurrent.Future;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerId;
import com.google.common.cache.LoadingCache;
public class LocalizerContext {
private final String user;
private final ContainerId containerId;
private final Credentials credentials;
private final LoadingCache<Path,Future<FileStatus>> statCache;
public LocalizerContext(String user, ContainerId containerId,
Credentials credentials) {
this(user, containerId, credentials, null);
public LocalizerContext(String user, ContainerId containerId,
Credentials credentials,
LoadingCache<Path,Future<FileStatus>> statCache) {
this.user = user;
this.containerId = containerId;
this.credentials = credentials;
this.statCache = statCache;
public String getUser() {
@ -46,4 +60,7 @@ public class LocalizerContext {
return credentials;
public LoadingCache<Path,Future<FileStatus>> getStatCache() {
return statCache;
@ -83,8 +83,8 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
@ -119,6 +119,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.FSDownload;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class ResourceLocalizationService extends CompositeService
@ -362,8 +364,11 @@ public class ResourceLocalizationService extends CompositeService
private void handleInitContainerResources(
ContainerLocalizationRequestEvent rsrcReqs) {
Container c = rsrcReqs.getContainer();
// create a loading cache for the file statuses
LoadingCache<Path,Future<FileStatus>> statCache =
LocalizerContext ctxt = new LocalizerContext(
c.getUser(), c.getContainerId(), c.getCredentials());
c.getUser(), c.getContainerId(), c.getCredentials(), statCache);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
@ -680,7 +685,8 @@ public class ResourceLocalizationService extends CompositeService
// completing and being dequeued before pending updated
synchronized (pending) {
pending.put(queue.submit(new FSDownload(lfs, null, conf,
publicDirDestPath, resource)), request);
publicDirDestPath, resource, request.getContext().getStatCache())),
} catch (IOException e) {
Reference in New Issue
Block a user