YARN-2755. NM fails to clean up usercache_DEL_<timestamp> dirs after YARN-661. Contributed by Siqi Li
This commit is contained in:
parent
179cab81e0
commit
73e626ad91
@ -801,6 +801,9 @@ Release 2.6.0 - UNRELEASED
|
|||||||
YARN-2769. Fixed the problem that timeline domain is not set in distributed shell
|
YARN-2769. Fixed the problem that timeline domain is not set in distributed shell
|
||||||
AM when using shell_command on Windows. (Varun Vasudev via zjshen)
|
AM when using shell_command on Windows. (Varun Vasudev via zjshen)
|
||||||
|
|
||||||
|
YARN-2755. NM fails to clean up usercache_DEL_<timestamp> dirs after
|
||||||
|
YARN-661 (Siqi Li via jlowe)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -1324,7 +1324,7 @@ private void cleanUpFilesPerUserDir(FileContext lfs, DeletionService del,
|
|||||||
RemoteIterator<FileStatus> userDirStatus = lfs.listStatus(userDirPath);
|
RemoteIterator<FileStatus> userDirStatus = lfs.listStatus(userDirPath);
|
||||||
FileDeletionTask dependentDeletionTask =
|
FileDeletionTask dependentDeletionTask =
|
||||||
del.createFileDeletionTask(null, userDirPath, new Path[] {});
|
del.createFileDeletionTask(null, userDirPath, new Path[] {});
|
||||||
if (userDirStatus != null) {
|
if (userDirStatus != null && userDirStatus.hasNext()) {
|
||||||
List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
|
List<FileDeletionTask> deletionTasks = new ArrayList<FileDeletionTask>();
|
||||||
while (userDirStatus.hasNext()) {
|
while (userDirStatus.hasNext()) {
|
||||||
FileStatus status = userDirStatus.next();
|
FileStatus status = userDirStatus.next();
|
||||||
|
@ -35,7 +35,9 @@
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.FileContext;
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||||
@ -190,34 +192,9 @@ public Void run() throws YarnException, IOException {
|
|||||||
ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
|
ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
|
||||||
|
|
||||||
// restart the NodeManager
|
// restart the NodeManager
|
||||||
nm.stop();
|
restartNM(MAX_TRIES);
|
||||||
nm = new MyNodeManager();
|
checkNumOfLocalDirs();
|
||||||
nm.start();
|
|
||||||
|
|
||||||
numTries = 0;
|
|
||||||
while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
|
||||||
ContainerLocalizer.USERCACHE) > 0
|
|
||||||
|| numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
|
||||||
ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(
|
|
||||||
nmLocalDir.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) > 0)
|
|
||||||
&& numTries < MAX_TRIES) {
|
|
||||||
try {
|
|
||||||
Thread.sleep(500);
|
|
||||||
} catch (InterruptedException ex) {
|
|
||||||
// Do nothing
|
|
||||||
}
|
|
||||||
numTries++;
|
|
||||||
}
|
|
||||||
|
|
||||||
Assert
|
|
||||||
.assertTrue(
|
|
||||||
"After NM reboots, all local files should be deleted",
|
|
||||||
numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
|
||||||
ContainerLocalizer.USERCACHE) == 0
|
|
||||||
&& numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
|
||||||
ContainerLocalizer.FILECACHE) == 0
|
|
||||||
&& numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
|
||||||
ResourceLocalizationService.NM_PRIVATE_DIR) == 0);
|
|
||||||
verify(delService, times(1)).delete(
|
verify(delService, times(1)).delete(
|
||||||
(String) isNull(),
|
(String) isNull(),
|
||||||
argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
|
argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR
|
||||||
@ -230,8 +207,52 @@ && numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
|||||||
verify(delService, times(1)).scheduleFileDeletionTask(
|
verify(delService, times(1)).scheduleFileDeletionTask(
|
||||||
argThat(new FileDeletionInclude(null, ContainerLocalizer.USERCACHE
|
argThat(new FileDeletionInclude(null, ContainerLocalizer.USERCACHE
|
||||||
+ "_DEL_", new String[] {})));
|
+ "_DEL_", new String[] {})));
|
||||||
|
|
||||||
|
// restart the NodeManager again
|
||||||
|
// this time usercache directory should be empty
|
||||||
|
restartNM(MAX_TRIES);
|
||||||
|
checkNumOfLocalDirs();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void restartNM(int maxTries) {
|
||||||
|
nm.stop();
|
||||||
|
nm = new MyNodeManager();
|
||||||
|
nm.start();
|
||||||
|
|
||||||
|
int numTries = 0;
|
||||||
|
while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||||
|
ContainerLocalizer.USERCACHE) > 0
|
||||||
|
|| numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||||
|
ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(
|
||||||
|
nmLocalDir.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) > 0)
|
||||||
|
&& numTries < maxTries) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(500);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
// Do nothing
|
||||||
|
}
|
||||||
|
numTries++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkNumOfLocalDirs() throws IOException {
|
||||||
|
Assert
|
||||||
|
.assertTrue(
|
||||||
|
"After NM reboots, all local files should be deleted",
|
||||||
|
numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||||
|
ContainerLocalizer.USERCACHE) == 0
|
||||||
|
&& numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||||
|
ContainerLocalizer.FILECACHE) == 0
|
||||||
|
&& numOfLocalDirs(nmLocalDir.getAbsolutePath(),
|
||||||
|
ResourceLocalizationService.NM_PRIVATE_DIR) == 0);
|
||||||
|
|
||||||
|
Assert
|
||||||
|
.assertTrue(
|
||||||
|
"After NM reboots, usercache_DEL_* directory should be deleted",
|
||||||
|
numOfUsercacheDELDirs(nmLocalDir.getAbsolutePath()) == 0);
|
||||||
|
}
|
||||||
|
|
||||||
private int numOfLocalDirs(String localDir, String localSubDir) {
|
private int numOfLocalDirs(String localDir, String localSubDir) {
|
||||||
File[] listOfFiles = new File(localDir, localSubDir).listFiles();
|
File[] listOfFiles = new File(localDir, localSubDir).listFiles();
|
||||||
if (listOfFiles == null) {
|
if (listOfFiles == null) {
|
||||||
@ -240,6 +261,19 @@ private int numOfLocalDirs(String localDir, String localSubDir) {
|
|||||||
return listOfFiles.length;
|
return listOfFiles.length;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int numOfUsercacheDELDirs(String localDir) throws IOException {
|
||||||
|
int count = 0;
|
||||||
|
RemoteIterator<FileStatus> fileStatus = localFS.listStatus(new Path(localDir));
|
||||||
|
while (fileStatus.hasNext()) {
|
||||||
|
FileStatus status = fileStatus.next();
|
||||||
|
if (status.getPath().getName().matches(".*" +
|
||||||
|
ContainerLocalizer.USERCACHE + "_DEL_.*")) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
private void createFiles(String dir, String subDir, int numOfFiles) {
|
private void createFiles(String dir, String subDir, int numOfFiles) {
|
||||||
for (int i = 0; i < numOfFiles; i++) {
|
for (int i = 0; i < numOfFiles; i++) {
|
||||||
|
Loading…
Reference in New Issue
Block a user