diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java index 1c216f430a..a4b158a85a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalDirAllocator.java @@ -24,8 +24,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.util.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -78,11 +76,25 @@ public class LocalDirAllocator { /** Used when size of file to be allocated is unknown. */ public static final int SIZE_UNKNOWN = -1; + private final DiskValidator diskValidator; + /**Create an allocator object * @param contextCfgItemName */ public LocalDirAllocator(String contextCfgItemName) { this.contextCfgItemName = contextCfgItemName; + try { + this.diskValidator = DiskValidatorFactory.getInstance( + BasicDiskValidator.NAME); + } catch (DiskErrorException e) { + throw new RuntimeException(e); + } + } + + public LocalDirAllocator(String contextCfgItemName, + DiskValidator diskValidator) { + this.contextCfgItemName = contextCfgItemName; + this.diskValidator = diskValidator; } /** This method must be used to obtain the dir allocation context for a @@ -96,7 +108,8 @@ private AllocatorPerContext obtainContext(String contextCfgItemName) { AllocatorPerContext l = contexts.get(contextCfgItemName); if (l == null) { contexts.put(contextCfgItemName, - (l = new AllocatorPerContext(contextCfgItemName))); + (l = new AllocatorPerContext(contextCfgItemName, + diskValidator))); } return l; } @@ -255,6 +268,7 @@ private static class AllocatorPerContext { // NOTE: the context must be accessed via a local reference as it // may be updated at any time to reference a different context private AtomicReference currentContext; + private final DiskValidator diskValidator; private static class Context { private AtomicInteger dirNumLastAccessed = new AtomicInteger(0); @@ -280,9 +294,11 @@ public int getAndIncrDirNumLastAccessed(int delta) { } } - public AllocatorPerContext(String contextCfgItemName) { + public AllocatorPerContext(String contextCfgItemName, + DiskValidator diskValidator) { this.contextCfgItemName = contextCfgItemName; this.currentContext = new AtomicReference(new Context()); + this.diskValidator = diskValidator; } /** This method gets called everytime before any read/write to make sure @@ -312,7 +328,7 @@ private Context confChanged(Configuration conf) ? new File(ctx.localFS.makeQualified(tmpDir).toUri()) : new File(dirStrings[i]); - DiskChecker.checkDir(tmpFile); + diskValidator.checkStatus(tmpFile); dirs.add(new Path(tmpFile.getPath())); dfList.add(new DF(tmpFile, 30000)); } catch (DiskErrorException de) { @@ -348,7 +364,7 @@ private Path createPath(Path dir, String path, //check whether we are able to create a directory here. If the disk //happens to be RDONLY we will fail try { - DiskChecker.checkDir(new File(file.getParent().toUri().getPath())); + diskValidator.checkStatus(new File(file.getParent().toUri().getPath())); return file; } catch (DiskErrorException d) { LOG.warn("Disk Error Exception: ", d); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java index 621cabc1b8..6eabd0dd5b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java @@ -27,6 +27,9 @@ import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import org.apache.hadoop.util.DiskChecker.DiskErrorException; +import org.apache.hadoop.util.DiskValidator; +import org.apache.hadoop.util.DiskValidatorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -155,13 +158,23 @@ public MonitoringTimerTask(Configuration conf) throws YarnRuntimeException { String local = conf.get(YarnConfiguration.NM_LOCAL_DIRS); conf.set(NM_GOOD_LOCAL_DIRS, (local != null) ? local : ""); - localDirsAllocator = new LocalDirAllocator( - NM_GOOD_LOCAL_DIRS); - String log = conf.get(YarnConfiguration.NM_LOG_DIRS); - conf.set(NM_GOOD_LOG_DIRS, - (log != null) ? log : ""); - logDirsAllocator = new LocalDirAllocator( - NM_GOOD_LOG_DIRS); + String diskValidatorName = conf.get(YarnConfiguration.DISK_VALIDATOR, + YarnConfiguration.DEFAULT_DISK_VALIDATOR); + try { + DiskValidator diskValidator = + DiskValidatorFactory.getInstance(diskValidatorName); + localDirsAllocator = new LocalDirAllocator( + NM_GOOD_LOCAL_DIRS, diskValidator); + String log = conf.get(YarnConfiguration.NM_LOG_DIRS); + conf.set(NM_GOOD_LOG_DIRS, + (log != null) ? log : ""); + logDirsAllocator = new LocalDirAllocator( + NM_GOOD_LOG_DIRS, diskValidator); + } catch (DiskErrorException e) { + throw new YarnRuntimeException( + "Failed to create DiskValidator of type " + diskValidatorName + "!", + e); + } } @Override