From 6a482a88b8f56a4c5590e71ce6713d7f63830e92 Mon Sep 17 00:00:00 2001 From: Siddharth Seth Date: Mon, 25 Mar 2013 18:23:59 +0000 Subject: [PATCH] YARN-71. Fix the NodeManager to clean up local-dirs on restart. Contributed by Xuan Gong. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1460808 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../yarn/server/nodemanager/NodeManager.java | 13 +- .../ResourceLocalizationService.java | 79 ++++- .../nodemanager/TestNodeManagerReboot.java | 297 ++++++++++++++++++ 4 files changed, 390 insertions(+), 2 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 96bff2f0f2..c3c13eea3a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -124,6 +124,9 @@ Release 2.0.5-beta - UNRELEASED YARN-470. Support a way to disable resource monitoring on the NodeManager. (Siddharth Seth via hitesh) + + YARN-71. Fix the NodeManager to clean up local-dirs on restart. + (Xuan Gong via sseth) Release 2.0.4-alpha - UNRELEASED diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 5aa947f32b..7a53eb9033 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -58,6 +58,8 @@ import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.util.Records; +import com.google.common.annotations.VisibleForTesting; + public class NodeManager extends CompositeService implements EventHandler { @@ -113,6 +115,10 @@ protected WebServer createWebServer(Context nmContext, return new WebServer(nmContext, resourceView, aclsManager, dirsHandler); } + protected DeletionService createDeletionService(ContainerExecutor exec) { + return new DeletionService(exec); + } + protected void doSecureLogin() throws IOException { SecurityUtil.login(getConfig(), YarnConfiguration.NM_KEYTAB, YarnConfiguration.NM_PRINCIPAL); @@ -143,7 +149,7 @@ public void init(Configuration conf) { } catch (IOException e) { throw new YarnException("Failed to initialize container executor", e); } - DeletionService del = new DeletionService(exec); + DeletionService del = createDeletionService(exec); addService(del); // NodeManager level dispatcher @@ -351,6 +357,11 @@ ContainerManagerImpl getContainerManager() { return containerManager; } + @VisibleForTesting + Context getNMContext() { + return this.context; + } + public static void main(String[] args) { Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); StringUtils.startupShutdownMessage(NodeManager.class, args, LOG); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index 9ca812e1dd..29971c5b65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -22,6 +22,7 @@ import java.io.DataOutputStream; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URISyntaxException; @@ -53,8 +54,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.Credentials; @@ -175,9 +178,11 @@ public void init(Configuration conf) { this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); try { - // TODO queue deletions here, rather than NM init? FileContext lfs = getLocalFileContext(conf); lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK)); + + cleanUpLocalDir(lfs,delService); + List localDirs = dirsHandler.getLocalDirs(); for (String localDir : localDirs) { // $local/usercache @@ -926,4 +931,76 @@ public void run() { } + private void cleanUpLocalDir(FileContext lfs, DeletionService del) { + long currentTimeStamp = System.currentTimeMillis(); + for (String localDir : dirsHandler.getLocalDirs()) { + renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE, + currentTimeStamp); + renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE, + currentTimeStamp); + renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR, + currentTimeStamp); + try { + deleteLocalDir(lfs, del, localDir); + } catch (IOException e) { + // Do nothing, just give the warning + LOG.warn("Failed to delete localDir: " + localDir); + } + } + } + + private void renameLocalDir(FileContext lfs, String localDir, + String localSubDir, long currentTimeStamp) { + try { + lfs.rename(new Path(localDir, localSubDir), new Path( + localDir, localSubDir + "_DEL_" + currentTimeStamp)); + } catch (FileNotFoundException ex) { + // No need to handle this exception + // localSubDir may not be exist + } catch (Exception ex) { + // Do nothing, just give the warning + LOG.warn("Failed to rename the local file under " + + localDir + "/" + localSubDir); + } + } + + private void deleteLocalDir(FileContext lfs, DeletionService del, + String localDir) throws IOException { + RemoteIterator fileStatus = lfs.listStatus(new Path(localDir)); + if (fileStatus != null) { + while (fileStatus.hasNext()) { + FileStatus status = fileStatus.next(); + try { + if (status.getPath().getName().matches(".*" + + ContainerLocalizer.USERCACHE + "_DEL_.*")) { + cleanUpFilesFromSubDir(lfs, del, status.getPath()); + } else if (status.getPath().getName() + .matches(".*" + NM_PRIVATE_DIR + "_DEL_.*") + || + status.getPath().getName() + .matches(".*" + ContainerLocalizer.FILECACHE + "_DEL_.*")) { + del.delete(null, status.getPath(), new Path[] {}); + } + } catch (IOException ex) { + // Do nothing, just give the warning + LOG.warn("Failed to delete this local Directory: " + + status.getPath().getName()); + } + } + } + } + + private void cleanUpFilesFromSubDir(FileContext lfs, DeletionService del, + Path dirPath) throws IOException { + RemoteIterator fileStatus = lfs.listStatus(dirPath); + if (fileStatus != null) { + while (fileStatus.hasNext()) { + FileStatus status = fileStatus.next(); + String owner = status.getOwner(); + del.delete(owner, status.getPath(), new Path[] {}); + } + } + del.delete(null, dirPath, new Path[] {}); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java new file mode 100644 index 0000000000..10a85c7480 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager; + +import static org.mockito.Mockito.*; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatcher; + +public class TestNodeManagerReboot { + + static final File basedir = + new File("target", TestNodeManagerReboot.class.getName()); + static final File logsDir = new File(basedir, "logs"); + static final File nmLocalDir = new File(basedir, "nm0"); + static final File localResourceDir = new File(basedir, "resource"); + + static final String user = System.getProperty("user.name"); + private FileContext localFS; + + private MyNodeManager nm; + private DeletionService delService; + static final Log LOG = LogFactory.getLog(TestNodeManagerReboot.class); + + @Before + public void setup() throws UnsupportedFileSystemException { + localFS = FileContext.getLocalFSFileContext(); + } + + @After + public void tearDown() throws IOException, InterruptedException { + localFS.delete(new Path(basedir.getPath()), true); + if (nm != null) { + nm.stop(); + } + } + + @Test(timeout = 20000) + public void testClearLocalDirWhenNodeReboot() throws IOException { + nm = new MyNodeManager(); + nm.start(); + // create files under fileCache + createFiles(nmLocalDir.getAbsolutePath(), ContainerLocalizer.FILECACHE, 100); + localResourceDir.mkdirs(); + ContainerManagerImpl containerManager = nm.getContainerManager(); + + ContainerLaunchContext containerLaunchContext = + Records.newRecord(ContainerLaunchContext.class); + // Construct the Container-id + ContainerId cId = createContainerId(); + containerLaunchContext.setContainerId(cId); + + containerLaunchContext.setUser(user); + + URL localResourceUri = + ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(localResourceDir.getAbsolutePath()))); + + LocalResource localResource = + Records.newRecord(LocalResource.class); + localResource.setResource(localResourceUri); + localResource.setSize(-1); + localResource.setVisibility(LocalResourceVisibility.APPLICATION); + localResource.setType(LocalResourceType.FILE); + localResource.setTimestamp(localResourceDir.lastModified()); + String destinationFile = "dest_file"; + Map localResources = + new HashMap(); + localResources.put(destinationFile, localResource); + containerLaunchContext.setLocalResources(localResources); + containerLaunchContext.setUser(containerLaunchContext.getUser()); + List commands = new ArrayList(); + containerLaunchContext.setCommands(commands); + containerLaunchContext.setResource(Records + .newRecord(Resource.class)); + containerLaunchContext.getResource().setMemory(1024); + StartContainerRequest startRequest = + Records.newRecord(StartContainerRequest.class); + startRequest.setContainerLaunchContext(containerLaunchContext); + containerManager.startContainer(startRequest); + + GetContainerStatusRequest request = + Records.newRecord(GetContainerStatusRequest.class); + request.setContainerId(cId); + Container container = + nm.getNMContext().getContainers().get(request.getContainerId()); + + final int MAX_TRIES = 20; + int numTries = 0; + while (!container.getContainerState().equals(ContainerState.DONE) + && numTries <= MAX_TRIES) { + try { + Thread.sleep(500); + } catch (InterruptedException ex) { + // Do nothing + } + numTries++; + } + + Assert.assertEquals(ContainerState.DONE, container.getContainerState()); + + Assert.assertTrue( + "The container should create a subDir named currentUser: " + user + + "under localDir/usercache", + numOfLocalDirs(nmLocalDir.getAbsolutePath(), + ContainerLocalizer.USERCACHE) > 0); + + Assert.assertTrue("There should be files or Dirs under nm_private when " + + "container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(), + ResourceLocalizationService.NM_PRIVATE_DIR) > 0); + + nm.handle(new NodeManagerEvent(NodeManagerEventType.REBOOT)); + + numTries = 0; + while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer + .USERCACHE) > 0 || numOfLocalDirs(nmLocalDir.getAbsolutePath(), + ContainerLocalizer.FILECACHE) > 0 || numOfLocalDirs(nmLocalDir + .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) + > 0) && numTries < MAX_TRIES) { + try { + Thread.sleep(500); + } catch (InterruptedException ex) { + // Do nothing + } + numTries++; + } + + Assert.assertTrue("After NM reboots, all local files should be deleted", + numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer + .USERCACHE) == 0 && numOfLocalDirs(nmLocalDir.getAbsolutePath(), + ContainerLocalizer.FILECACHE) == 0 && numOfLocalDirs(nmLocalDir + .getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) + == 0); + verify(delService, times(1)).delete(eq(user), + argThat(new PathInclude(user))); + verify(delService, times(1)).delete( + (String) isNull(), + argThat(new PathInclude(ResourceLocalizationService.NM_PRIVATE_DIR + + "_DEL_"))); + verify(delService, times(1)).delete((String) isNull(), + argThat(new PathInclude(ContainerLocalizer.FILECACHE + "_DEL_"))); + verify(delService, times(1)).delete((String) isNull(), + argThat(new PathInclude(ContainerLocalizer.USERCACHE + "_DEL_"))); + + } + + private int numOfLocalDirs(String localDir, String localSubDir) { + File[] listOfFiles = new File(localDir, localSubDir).listFiles(); + if (listOfFiles == null) { + return 0; + } else { + return listOfFiles.length; + } + } + + private void createFiles(String dir, String subDir, int numOfFiles) { + for (int i = 0; i < numOfFiles; i++) { + File newFile = new File(dir + "/" + subDir, "file_" + (i + 1)); + try { + newFile.createNewFile(); + } catch (IOException e) { + // Do nothing + } + } + } + + private ContainerId createContainerId() { + ApplicationId appId = Records.newRecord(ApplicationId.class); + appId.setClusterTimestamp(0); + appId.setId(0); + ApplicationAttemptId appAttemptId = + Records.newRecord(ApplicationAttemptId.class); + appAttemptId.setApplicationId(appId); + appAttemptId.setAttemptId(1); + ContainerId containerId = + Records.newRecord(ContainerId.class); + containerId.setApplicationAttemptId(appAttemptId); + return containerId; + } + + private class MyNodeManager extends NodeManager { + + public MyNodeManager() { + super(); + this.init(createNMConfig()); + } + + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + MockNodeStatusUpdater myNodeStatusUpdater = new MockNodeStatusUpdater( + context, dispatcher, healthChecker, metrics); + return myNodeStatusUpdater; + } + + @Override + protected DeletionService createDeletionService(ContainerExecutor exec) { + delService = spy(new DeletionService(exec)); + return delService; + } + + // mimic part of reboot process + @Override + public void handle(NodeManagerEvent event) { + switch (event.getType()) { + case SHUTDOWN: + this.stop(); + break; + case REBOOT: + this.stop(); + this.createNewMyNodeManager().start(); + break; + default: + LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring."); + } + } + + private MyNodeManager createNewMyNodeManager() { + return new MyNodeManager(); + } + + private YarnConfiguration createNMConfig() { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB + conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345"); + conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346"); + conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath()); + return conf; + } + } + + class PathInclude extends ArgumentMatcher { + + final String part; + + PathInclude(String part) { + this.part = part; + } + + @Override + public boolean matches(Object o) { + return ((Path) o).getName().indexOf(part) != -1; + } + } +}