YARN-6708. Nodemanager container crash after ext3 folder limit. Contributed by Bibin A Chundatt
This commit is contained in:
parent
946dd25675
commit
7576a688ea
@ -31,6 +31,7 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.Stack;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CancellationException;
|
import java.util.concurrent.CancellationException;
|
||||||
import java.util.concurrent.CompletionService;
|
import java.util.concurrent.CompletionService;
|
||||||
@ -60,6 +61,7 @@
|
|||||||
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
||||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||||
import org.apache.hadoop.yarn.api.records.SerializedException;
|
import org.apache.hadoop.yarn.api.records.SerializedException;
|
||||||
import org.apache.hadoop.yarn.api.records.URL;
|
import org.apache.hadoop.yarn.api.records.URL;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
@ -95,6 +97,8 @@ public class ContainerLocalizer {
|
|||||||
private static final String USERCACHE_CTXT_FMT = "%s.user.cache.dirs";
|
private static final String USERCACHE_CTXT_FMT = "%s.user.cache.dirs";
|
||||||
private static final FsPermission FILECACHE_PERMS =
|
private static final FsPermission FILECACHE_PERMS =
|
||||||
new FsPermission((short)0710);
|
new FsPermission((short)0710);
|
||||||
|
private static final FsPermission USERCACHE_FOLDER_PERMS =
|
||||||
|
new FsPermission((short) 0755);
|
||||||
|
|
||||||
private final String user;
|
private final String user;
|
||||||
private final String appId;
|
private final String appId;
|
||||||
@ -237,10 +241,29 @@ Path doDownloadCall() throws Exception {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Callable<Path> download(Path path, LocalResource rsrc,
|
Callable<Path> download(Path destDirPath, LocalResource rsrc,
|
||||||
UserGroupInformation ugi) throws IOException {
|
UserGroupInformation ugi) throws IOException {
|
||||||
diskValidator.checkStatus(new File(path.toUri().getRawPath()));
|
// For private localization FsDownload creates folder in destDirPath. Parent
|
||||||
return new FSDownloadWrapper(lfs, ugi, conf, path, rsrc);
|
// directories till user filecache folder is created here.
|
||||||
|
if (rsrc.getVisibility() == LocalResourceVisibility.PRIVATE) {
|
||||||
|
createParentDirs(destDirPath);
|
||||||
|
}
|
||||||
|
diskValidator.checkStatus(new File(destDirPath.toUri().getRawPath()));
|
||||||
|
return new FSDownloadWrapper(lfs, ugi, conf, destDirPath, rsrc);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createParentDirs(Path destDirPath) throws IOException {
|
||||||
|
Path parent = destDirPath.getParent();
|
||||||
|
Path cacheRoot = LocalCacheDirectoryManager.getCacheDirectoryRoot(parent);
|
||||||
|
Stack<Path> dirs = new Stack<Path>();
|
||||||
|
while (!parent.equals(cacheRoot)) {
|
||||||
|
dirs.push(parent);
|
||||||
|
parent = parent.getParent();
|
||||||
|
}
|
||||||
|
// Create directories with user cache permission
|
||||||
|
while (!dirs.isEmpty()) {
|
||||||
|
createDir(lfs, dirs.pop(), USERCACHE_FOLDER_PERMS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static long getEstimatedSize(LocalResource rsrc) {
|
static long getEstimatedSize(LocalResource rsrc) {
|
||||||
@ -455,21 +478,21 @@ private static void initDirs(Configuration conf, String user, String appId,
|
|||||||
// $x/usercache/$user/filecache
|
// $x/usercache/$user/filecache
|
||||||
Path userFileCacheDir = new Path(base, FILECACHE);
|
Path userFileCacheDir = new Path(base, FILECACHE);
|
||||||
usersFileCacheDirs[i] = userFileCacheDir.toString();
|
usersFileCacheDirs[i] = userFileCacheDir.toString();
|
||||||
createDir(lfs, userFileCacheDir, FILECACHE_PERMS, false);
|
createDir(lfs, userFileCacheDir, FILECACHE_PERMS);
|
||||||
// $x/usercache/$user/appcache/$appId
|
// $x/usercache/$user/appcache/$appId
|
||||||
Path appBase = new Path(base, new Path(APPCACHE, appId));
|
Path appBase = new Path(base, new Path(APPCACHE, appId));
|
||||||
// $x/usercache/$user/appcache/$appId/filecache
|
// $x/usercache/$user/appcache/$appId/filecache
|
||||||
Path appFileCacheDir = new Path(appBase, FILECACHE);
|
Path appFileCacheDir = new Path(appBase, FILECACHE);
|
||||||
appsFileCacheDirs[i] = appFileCacheDir.toString();
|
appsFileCacheDirs[i] = appFileCacheDir.toString();
|
||||||
createDir(lfs, appFileCacheDir, FILECACHE_PERMS, false);
|
createDir(lfs, appFileCacheDir, FILECACHE_PERMS);
|
||||||
}
|
}
|
||||||
conf.setStrings(String.format(APPCACHE_CTXT_FMT, appId), appsFileCacheDirs);
|
conf.setStrings(String.format(APPCACHE_CTXT_FMT, appId), appsFileCacheDirs);
|
||||||
conf.setStrings(String.format(USERCACHE_CTXT_FMT, user), usersFileCacheDirs);
|
conf.setStrings(String.format(USERCACHE_CTXT_FMT, user), usersFileCacheDirs);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void createDir(FileContext lfs, Path dirPath,
|
private static void createDir(FileContext lfs, Path dirPath,
|
||||||
FsPermission perms, boolean createParent) throws IOException {
|
FsPermission perms) throws IOException {
|
||||||
lfs.mkdir(dirPath, perms, createParent);
|
lfs.mkdir(dirPath, perms, false);
|
||||||
if (!perms.equals(perms.applyUMask(lfs.getUMask()))) {
|
if (!perms.equals(perms.applyUMask(lfs.getUMask()))) {
|
||||||
lfs.setPermission(dirPath, perms);
|
lfs.setPermission(dirPath, perms);
|
||||||
}
|
}
|
||||||
|
@ -38,6 +38,7 @@
|
|||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -50,11 +51,12 @@
|
|||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.AbstractFileSystem;
|
import org.apache.hadoop.fs.AbstractFileSystem;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@ -80,7 +82,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
|
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
|
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
|
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.ArgumentMatcher;
|
import org.mockito.ArgumentMatcher;
|
||||||
@ -88,12 +90,15 @@
|
|||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
|
||||||
public class TestContainerLocalizer {
|
public class TestContainerLocalizer {
|
||||||
|
|
||||||
static final Log LOG = LogFactory.getLog(TestContainerLocalizer.class);
|
static final Log LOG = LogFactory.getLog(TestContainerLocalizer.class);
|
||||||
static final Path basedir =
|
static final Path basedir =
|
||||||
new Path("target", TestContainerLocalizer.class.getName());
|
new Path("target", TestContainerLocalizer.class.getName());
|
||||||
static final FsPermission CACHE_DIR_PERM = new FsPermission((short)0710);
|
static final FsPermission CACHE_DIR_PERM = new FsPermission((short)0710);
|
||||||
|
static final FsPermission USERCACHE_DIR_PERM = new FsPermission((short) 0755);
|
||||||
|
|
||||||
static final String appUser = "yak";
|
static final String appUser = "yak";
|
||||||
static final String appId = "app_RM_0";
|
static final String appId = "app_RM_0";
|
||||||
@ -101,6 +106,10 @@ public class TestContainerLocalizer {
|
|||||||
static final InetSocketAddress nmAddr =
|
static final InetSocketAddress nmAddr =
|
||||||
new InetSocketAddress("foobar", 8040);
|
new InetSocketAddress("foobar", 8040);
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cleanUp() throws IOException {
|
||||||
|
FileUtils.deleteDirectory(new File(basedir.toUri().getRawPath()));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMain() throws Exception {
|
public void testMain() throws Exception {
|
||||||
@ -635,4 +644,34 @@ static DataInputBuffer createFakeCredentials(Random r, int nTok)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testUserCacheDirPermission() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
|
||||||
|
FileContext lfs = FileContext.getLocalFSFileContext(conf);
|
||||||
|
Path fileCacheDir = lfs.makeQualified(new Path(basedir, "filecache"));
|
||||||
|
lfs.mkdir(fileCacheDir, FsPermission.getDefault(), true);
|
||||||
|
RecordFactory recordFactory = mock(RecordFactory.class);
|
||||||
|
ContainerLocalizer localizer = new ContainerLocalizer(lfs,
|
||||||
|
UserGroupInformation.getCurrentUser().getUserName(), "application_01",
|
||||||
|
"container_01", new ArrayList<Path>(), recordFactory);
|
||||||
|
LocalResource rsrc = mock(LocalResource.class);
|
||||||
|
when(rsrc.getVisibility()).thenReturn(LocalResourceVisibility.PRIVATE);
|
||||||
|
Path destDirPath = new Path(fileCacheDir, "0/0/85");
|
||||||
|
//create one of the parent directories with the wrong permissions first
|
||||||
|
FsPermission wrongPerm = new FsPermission((short) 0700);
|
||||||
|
lfs.mkdir(destDirPath.getParent().getParent(), wrongPerm, false);
|
||||||
|
lfs.mkdir(destDirPath.getParent(), wrongPerm, false);
|
||||||
|
//Localize and check the directory permission are correct.
|
||||||
|
localizer
|
||||||
|
.download(destDirPath, rsrc, UserGroupInformation.getCurrentUser());
|
||||||
|
Assert
|
||||||
|
.assertEquals("Cache directory permissions filecache/0/0 is incorrect",
|
||||||
|
USERCACHE_DIR_PERM,
|
||||||
|
lfs.getFileStatus(destDirPath.getParent()).getPermission());
|
||||||
|
Assert.assertEquals("Cache directory permissions filecache/0 is incorrect",
|
||||||
|
USERCACHE_DIR_PERM,
|
||||||
|
lfs.getFileStatus(destDirPath.getParent().getParent()).getPermission());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user