YARN-2624. Resource Localization fails on a cluster due to existing cache directories. Contributed by Anubhav Dhoot

This commit is contained in:
Jason Lowe 2014-10-02 17:39:34 +00:00
parent 5e0b49da9c
commit 29f520052e
6 changed files with 98 additions and 1 deletions
hadoop-yarn-project
CHANGES.txt
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src
main/java/org/apache/hadoop/yarn/server/nodemanager
test/java/org/apache/hadoop/yarn/server/nodemanager

@ -521,6 +521,9 @@ Release 2.6.0 - UNRELEASED
YARN-2617. Fixed NM to not send duplicate container status whose app is not YARN-2617. Fixed NM to not send duplicate container status whose app is not
running. (Jun Gong via jianhe) running. (Jun Gong via jianhe)
YARN-2624. Resource Localization fails on a cluster due to existing cache
directories (Anubhav Dhoot via jlowe)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

@ -222,7 +222,7 @@ public class ResourceLocalizationService extends CompositeService
FileContext lfs = getLocalFileContext(conf); FileContext lfs = getLocalFileContext(conf);
lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK)); lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
if (!stateStore.canRecover()) { if (!stateStore.canRecover() || stateStore.isNewlyCreated()) {
cleanUpLocalDir(lfs,delService); cleanUpLocalDir(lfs,delService);
} }

@ -118,6 +118,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final byte[] EMPTY_VALUE = new byte[0]; private static final byte[] EMPTY_VALUE = new byte[0];
private DB db; private DB db;
private boolean isNewlyCreated;
public NMLeveldbStateStoreService() { public NMLeveldbStateStoreService() {
super(NMLeveldbStateStoreService.class.getName()); super(NMLeveldbStateStoreService.class.getName());
@ -134,6 +135,11 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
} }
} }
@Override
public boolean isNewlyCreated() {
return isNewlyCreated;
}
@Override @Override
public List<RecoveredContainerState> loadContainersState() public List<RecoveredContainerState> loadContainersState()
@ -837,6 +843,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
} catch (NativeDB.DBException e) { } catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating state database at " + dbfile); LOG.info("Creating state database at " + dbfile);
isNewlyCreated = true;
options.createIfMissing(true); options.createIfMissing(true);
try { try {
db = JniDBFactory.factory.open(dbfile, options); db = JniDBFactory.factory.open(dbfile, options);

@ -211,6 +211,9 @@ public abstract class NMStateStoreService extends AbstractService {
return true; return true;
} }
public boolean isNewlyCreated() {
return false;
}
/** /**
* Load the state of applications * Load the state of applications

@ -26,6 +26,7 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyShort; import static org.mockito.Matchers.anyShort;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA; import static org.mockito.Matchers.isA;
@ -38,11 +39,14 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -58,6 +62,10 @@ import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.security.AccessControlException;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -227,6 +235,74 @@ public class TestResourceLocalizationService {
} }
} }
@Test
public void testDirectoryCleanupOnNewlyCreatedStateStore()
throws IOException, URISyntaxException {
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.init(new Configuration());
ContainerExecutor exec = mock(ContainerExecutor.class);
DeletionService delService = spy(new DeletionService(exec));
delService.init(conf);
delService.start();
List<Path> localDirs = new ArrayList<Path>();
String[] sDirs = new String[4];
for (int i = 0; i < 4; ++i) {
localDirs.add(lfs.makeQualified(new Path(basedir, i + "")));
sDirs[i] = localDirs.get(i).toString();
}
conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
LocalDirsHandlerService diskhandler = new LocalDirsHandlerService();
diskhandler.init(conf);
NMStateStoreService nmStateStoreService = mock(NMStateStoreService.class);
when(nmStateStoreService.canRecover()).thenReturn(true);
when(nmStateStoreService.isNewlyCreated()).thenReturn(true);
ResourceLocalizationService locService =
spy(new ResourceLocalizationService(dispatcher, exec, delService,
diskhandler,
nmStateStoreService));
doReturn(lfs)
.when(locService).getLocalFileContext(isA(Configuration.class));
try {
dispatcher.start();
// initialize ResourceLocalizationService
locService.init(conf);
final FsPermission defaultPerm = new FsPermission((short)0755);
// verify directory creation
for (Path p : localDirs) {
p = new Path((new URI(p.toString())).getPath());
Path usercache = new Path(p, ContainerLocalizer.USERCACHE);
verify(spylfs)
.rename(eq(usercache), any(Path.class), any(Options.Rename.class));
verify(spylfs)
.mkdir(eq(usercache),
eq(defaultPerm), eq(true));
Path publicCache = new Path(p, ContainerLocalizer.FILECACHE);
verify(spylfs)
.rename(eq(usercache), any(Path.class), any(Options.Rename.class));
verify(spylfs)
.mkdir(eq(publicCache),
eq(defaultPerm), eq(true));
Path nmPriv = new Path(p, ResourceLocalizationService.NM_PRIVATE_DIR);
verify(spylfs)
.rename(eq(usercache), any(Path.class), any(Options.Rename.class));
verify(spylfs).mkdir(eq(nmPriv),
eq(ResourceLocalizationService.NM_PRIVATE_PERM), eq(true));
}
} finally {
dispatcher.stop();
delService.stop();
}
}
@Test @Test
@SuppressWarnings("unchecked") // mocked generics @SuppressWarnings("unchecked") // mocked generics
public void testResourceRelease() throws Exception { public void testResourceRelease() throws Exception {

@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -123,6 +124,13 @@ public class TestNMLeveldbStateStoreService {
assertTrue(state.getUserResources().isEmpty()); assertTrue(state.getUserResources().isEmpty());
} }
@Test
public void testIsNewlyCreated() throws IOException {
assertTrue(stateStore.isNewlyCreated());
restartStateStore();
assertFalse(stateStore.isNewlyCreated());
}
@Test @Test
public void testEmptyState() throws IOException { public void testEmptyState() throws IOException {
assertTrue(stateStore.canRecover()); assertTrue(stateStore.canRecover());