YARN-7300. DiskValidator is not used in LocalDirAllocator. (Szilard Nemeth via Haibo Chen)

This commit is contained in:
Haibo Chen 2018-07-19 16:27:11 -07:00
parent f354f47f99
commit e6873dfde0
2 changed files with 42 additions and 13 deletions

View File

@ -24,8 +24,6 @@
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -78,11 +76,25 @@ public class LocalDirAllocator {
/** Used when size of file to be allocated is unknown. */
public static final int SIZE_UNKNOWN = -1;
private final DiskValidator diskValidator;
/**Create an allocator object
* @param contextCfgItemName
*/
public LocalDirAllocator(String contextCfgItemName) {
this.contextCfgItemName = contextCfgItemName;
try {
this.diskValidator = DiskValidatorFactory.getInstance(
BasicDiskValidator.NAME);
} catch (DiskErrorException e) {
throw new RuntimeException(e);
}
}
public LocalDirAllocator(String contextCfgItemName,
DiskValidator diskValidator) {
this.contextCfgItemName = contextCfgItemName;
this.diskValidator = diskValidator;
}
/** This method must be used to obtain the dir allocation context for a
@ -96,7 +108,8 @@ private AllocatorPerContext obtainContext(String contextCfgItemName) {
AllocatorPerContext l = contexts.get(contextCfgItemName);
if (l == null) {
contexts.put(contextCfgItemName,
(l = new AllocatorPerContext(contextCfgItemName)));
(l = new AllocatorPerContext(contextCfgItemName,
diskValidator)));
}
return l;
}
@ -255,6 +268,7 @@ private static class AllocatorPerContext {
// NOTE: the context must be accessed via a local reference as it
// may be updated at any time to reference a different context
private AtomicReference<Context> currentContext;
private final DiskValidator diskValidator;
private static class Context {
private AtomicInteger dirNumLastAccessed = new AtomicInteger(0);
@ -280,9 +294,11 @@ public int getAndIncrDirNumLastAccessed(int delta) {
}
}
public AllocatorPerContext(String contextCfgItemName) {
public AllocatorPerContext(String contextCfgItemName,
DiskValidator diskValidator) {
this.contextCfgItemName = contextCfgItemName;
this.currentContext = new AtomicReference<Context>(new Context());
this.diskValidator = diskValidator;
}
/** This method gets called everytime before any read/write to make sure
@ -312,7 +328,7 @@ private Context confChanged(Configuration conf)
? new File(ctx.localFS.makeQualified(tmpDir).toUri())
: new File(dirStrings[i]);
DiskChecker.checkDir(tmpFile);
diskValidator.checkStatus(tmpFile);
dirs.add(new Path(tmpFile.getPath()));
dfList.add(new DF(tmpFile, 30000));
} catch (DiskErrorException de) {
@ -348,7 +364,7 @@ private Path createPath(Path dir, String path,
//check whether we are able to create a directory here. If the disk
//happens to be RDONLY we will fail
try {
DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
diskValidator.checkStatus(new File(file.getParent().toUri().getPath()));
return file;
} catch (DiskErrorException d) {
LOG.warn("Disk Error Exception: ", d);

View File

@ -27,6 +27,9 @@
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskValidator;
import org.apache.hadoop.util.DiskValidatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -155,13 +158,23 @@ public MonitoringTimerTask(Configuration conf) throws YarnRuntimeException {
String local = conf.get(YarnConfiguration.NM_LOCAL_DIRS);
conf.set(NM_GOOD_LOCAL_DIRS,
(local != null) ? local : "");
String diskValidatorName = conf.get(YarnConfiguration.DISK_VALIDATOR,
YarnConfiguration.DEFAULT_DISK_VALIDATOR);
try {
DiskValidator diskValidator =
DiskValidatorFactory.getInstance(diskValidatorName);
localDirsAllocator = new LocalDirAllocator(
NM_GOOD_LOCAL_DIRS);
NM_GOOD_LOCAL_DIRS, diskValidator);
String log = conf.get(YarnConfiguration.NM_LOG_DIRS);
conf.set(NM_GOOD_LOG_DIRS,
(log != null) ? log : "");
logDirsAllocator = new LocalDirAllocator(
NM_GOOD_LOG_DIRS);
NM_GOOD_LOG_DIRS, diskValidator);
} catch (DiskErrorException e) {
throw new YarnRuntimeException(
"Failed to create DiskValidator of type " + diskValidatorName + "!",
e);
}
}
@Override