HADOOP-10048. LocalDirAllocator should avoid holding locks while accessing the filesystem. Contributed by Jason Lowe.

This commit is contained in:
Junping Du 2016-06-07 09:18:58 -07:00
parent e620530301
commit c14c1b298e

View File

@ -20,9 +20,10 @@
import java.io.*; import java.io.*;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.*; import org.apache.commons.logging.*;
import org.apache.hadoop.util.*; import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -247,74 +248,101 @@ private static class AllocatorPerContext {
private final Log LOG = private final Log LOG =
LogFactory.getLog(AllocatorPerContext.class); LogFactory.getLog(AllocatorPerContext.class);
private int dirNumLastAccessed;
private Random dirIndexRandomizer = new Random(); private Random dirIndexRandomizer = new Random();
private FileSystem localFS;
private DF[] dirDF = new DF[0];
private String contextCfgItemName; private String contextCfgItemName;
private String[] localDirs = new String[0];
private String savedLocalDirs = ""; // NOTE: the context must be accessed via a local reference as it
// may be updated at any time to reference a different context
private AtomicReference<Context> currentContext;
private static class Context {
private AtomicInteger dirNumLastAccessed = new AtomicInteger(0);
private FileSystem localFS;
private DF[] dirDF;
private Path[] localDirs;
private String savedLocalDirs;
public int getAndIncrDirNumLastAccessed() {
return getAndIncrDirNumLastAccessed(1);
}
public int getAndIncrDirNumLastAccessed(int delta) {
if (localDirs.length < 2 || delta == 0) {
return dirNumLastAccessed.get();
}
int oldval, newval;
do {
oldval = dirNumLastAccessed.get();
newval = (oldval + delta) % localDirs.length;
} while (!dirNumLastAccessed.compareAndSet(oldval, newval));
return oldval;
}
}
public AllocatorPerContext(String contextCfgItemName) { public AllocatorPerContext(String contextCfgItemName) {
this.contextCfgItemName = contextCfgItemName; this.contextCfgItemName = contextCfgItemName;
this.currentContext = new AtomicReference<Context>(new Context());
} }
/** This method gets called everytime before any read/write to make sure /** This method gets called everytime before any read/write to make sure
* that any change to localDirs is reflected immediately. * that any change to localDirs is reflected immediately.
*/ */
private synchronized void confChanged(Configuration conf) private Context confChanged(Configuration conf)
throws IOException { throws IOException {
Context ctx = currentContext.get();
String newLocalDirs = conf.get(contextCfgItemName); String newLocalDirs = conf.get(contextCfgItemName);
if (null == newLocalDirs) { if (null == newLocalDirs) {
throw new IOException(contextCfgItemName + " not configured"); throw new IOException(contextCfgItemName + " not configured");
} }
if (!newLocalDirs.equals(savedLocalDirs)) { if (!newLocalDirs.equals(ctx.savedLocalDirs)) {
localDirs = StringUtils.getTrimmedStrings(newLocalDirs); ctx = new Context();
localFS = FileSystem.getLocal(conf); String[] dirStrings = StringUtils.getTrimmedStrings(newLocalDirs);
int numDirs = localDirs.length; ctx.localFS = FileSystem.getLocal(conf);
ArrayList<String> dirs = new ArrayList<String>(numDirs); int numDirs = dirStrings.length;
ArrayList<Path> dirs = new ArrayList<Path>(numDirs);
ArrayList<DF> dfList = new ArrayList<DF>(numDirs); ArrayList<DF> dfList = new ArrayList<DF>(numDirs);
for (int i = 0; i < numDirs; i++) { for (int i = 0; i < numDirs; i++) {
try { try {
// filter problematic directories // filter problematic directories
Path tmpDir = new Path(localDirs[i]); Path tmpDir = new Path(dirStrings[i]);
if(localFS.mkdirs(tmpDir)|| localFS.exists(tmpDir)) { if(ctx.localFS.mkdirs(tmpDir)|| ctx.localFS.exists(tmpDir)) {
try { try {
File tmpFile = tmpDir.isAbsolute() File tmpFile = tmpDir.isAbsolute()
? new File(localFS.makeQualified(tmpDir).toUri()) ? new File(ctx.localFS.makeQualified(tmpDir).toUri())
: new File(localDirs[i]); : new File(dirStrings[i]);
DiskChecker.checkDir(tmpFile); DiskChecker.checkDir(tmpFile);
dirs.add(tmpFile.getPath()); dirs.add(new Path(tmpFile.getPath()));
dfList.add(new DF(tmpFile, 30000)); dfList.add(new DF(tmpFile, 30000));
} catch (DiskErrorException de) { } catch (DiskErrorException de) {
LOG.warn( localDirs[i] + " is not writable\n", de); LOG.warn(dirStrings[i] + " is not writable\n", de);
} }
} else { } else {
LOG.warn( "Failed to create " + localDirs[i]); LOG.warn("Failed to create " + dirStrings[i]);
} }
} catch (IOException ie) { } catch (IOException ie) {
LOG.warn( "Failed to create " + localDirs[i] + ": " + LOG.warn("Failed to create " + dirStrings[i] + ": " +
ie.getMessage() + "\n", ie); ie.getMessage() + "\n", ie);
} //ignore } //ignore
} }
localDirs = dirs.toArray(new String[dirs.size()]); ctx.localDirs = dirs.toArray(new Path[dirs.size()]);
dirDF = dfList.toArray(new DF[dirs.size()]); ctx.dirDF = dfList.toArray(new DF[dirs.size()]);
savedLocalDirs = newLocalDirs; ctx.savedLocalDirs = newLocalDirs;
if (dirs.size() > 0) { if (dirs.size() > 0) {
// randomize the first disk picked in the round-robin selection // randomize the first disk picked in the round-robin selection
dirNumLastAccessed = dirIndexRandomizer.nextInt(dirs.size()); ctx.dirNumLastAccessed.set(dirIndexRandomizer.nextInt(dirs.size()));
}
}
} }
private Path createPath(String path, currentContext.set(ctx);
}
return ctx;
}
private Path createPath(Path dir, String path,
boolean checkWrite) throws IOException { boolean checkWrite) throws IOException {
Path file = new Path(new Path(localDirs[dirNumLastAccessed]), Path file = new Path(dir, path);
path);
if (checkWrite) { if (checkWrite) {
//check whether we are able to create a directory here. If the disk //check whether we are able to create a directory here. If the disk
//happens to be RDONLY we will fail //happens to be RDONLY we will fail
@ -334,7 +362,7 @@ private Path createPath(String path,
* @return the current directory index. * @return the current directory index.
*/ */
int getCurrentDirectoryIndex() { int getCurrentDirectoryIndex() {
return dirNumLastAccessed; return currentContext.get().dirNumLastAccessed.get();
} }
/** Get a path from the local FS. If size is known, we go /** Get a path from the local FS. If size is known, we go
@ -344,10 +372,10 @@ int getCurrentDirectoryIndex() {
* If size is not known, use roulette selection -- pick directories * If size is not known, use roulette selection -- pick directories
* with probability proportional to their available space. * with probability proportional to their available space.
*/ */
public synchronized Path getLocalPathForWrite(String pathStr, long size, public Path getLocalPathForWrite(String pathStr, long size,
Configuration conf, boolean checkWrite) throws IOException { Configuration conf, boolean checkWrite) throws IOException {
confChanged(conf); Context ctx = confChanged(conf);
int numDirs = localDirs.length; int numDirs = ctx.localDirs.length;
int numDirsSearched = 0; int numDirsSearched = 0;
//remove the leading slash from the path (to make sure that the uri //remove the leading slash from the path (to make sure that the uri
//resolution results in a valid path on the dir being checked) //resolution results in a valid path on the dir being checked)
@ -358,12 +386,12 @@ public synchronized Path getLocalPathForWrite(String pathStr, long size,
if(size == SIZE_UNKNOWN) { //do roulette selection: pick dir with probability if(size == SIZE_UNKNOWN) { //do roulette selection: pick dir with probability
//proportional to available size //proportional to available size
long[] availableOnDisk = new long[dirDF.length]; long[] availableOnDisk = new long[ctx.dirDF.length];
long totalAvailable = 0; long totalAvailable = 0;
//build the "roulette wheel" //build the "roulette wheel"
for(int i =0; i < dirDF.length; ++i) { for(int i =0; i < ctx.dirDF.length; ++i) {
availableOnDisk[i] = dirDF[i].getAvailable(); availableOnDisk[i] = ctx.dirDF[i].getAvailable();
totalAvailable += availableOnDisk[i]; totalAvailable += availableOnDisk[i];
} }
@ -380,8 +408,8 @@ public synchronized Path getLocalPathForWrite(String pathStr, long size,
randomPosition -= availableOnDisk[dir]; randomPosition -= availableOnDisk[dir];
dir++; dir++;
} }
dirNumLastAccessed = dir; ctx.dirNumLastAccessed.set(dir);
returnPath = createPath(pathStr, checkWrite); returnPath = createPath(ctx.localDirs[dir], pathStr, checkWrite);
if (returnPath == null) { if (returnPath == null) {
totalAvailable -= availableOnDisk[dir]; totalAvailable -= availableOnDisk[dir];
availableOnDisk[dir] = 0; // skip this disk availableOnDisk[dir] = 0; // skip this disk
@ -389,13 +417,19 @@ public synchronized Path getLocalPathForWrite(String pathStr, long size,
} }
} }
} else { } else {
while (numDirsSearched < numDirs && returnPath == null) { int dirNum = ctx.getAndIncrDirNumLastAccessed();
long capacity = dirDF[dirNumLastAccessed].getAvailable(); while (numDirsSearched < numDirs) {
long capacity = ctx.dirDF[dirNum].getAvailable();
if (capacity > size) { if (capacity > size) {
returnPath = createPath(pathStr, checkWrite); returnPath =
createPath(ctx.localDirs[dirNum], pathStr, checkWrite);
if (returnPath != null) {
ctx.getAndIncrDirNumLastAccessed(numDirsSearched);
break;
} }
dirNumLastAccessed++; }
dirNumLastAccessed = dirNumLastAccessed % numDirs; dirNum++;
dirNum = dirNum % numDirs;
numDirsSearched++; numDirsSearched++;
} }
} }
@ -432,10 +466,10 @@ public File createTmpFileForWrite(String pathStr, long size,
* configured dirs for the file's existence and return the complete * configured dirs for the file's existence and return the complete
* path to the file when we find one * path to the file when we find one
*/ */
public synchronized Path getLocalPathToRead(String pathStr, public Path getLocalPathToRead(String pathStr,
Configuration conf) throws IOException { Configuration conf) throws IOException {
confChanged(conf); Context ctx = confChanged(conf);
int numDirs = localDirs.length; int numDirs = ctx.localDirs.length;
int numDirsSearched = 0; int numDirsSearched = 0;
//remove the leading slash from the path (to make sure that the uri //remove the leading slash from the path (to make sure that the uri
//resolution results in a valid path on the dir being checked) //resolution results in a valid path on the dir being checked)
@ -443,8 +477,8 @@ public synchronized Path getLocalPathToRead(String pathStr,
pathStr = pathStr.substring(1); pathStr = pathStr.substring(1);
} }
while (numDirsSearched < numDirs) { while (numDirsSearched < numDirs) {
Path file = new Path(localDirs[numDirsSearched], pathStr); Path file = new Path(ctx.localDirs[numDirsSearched], pathStr);
if (localFS.exists(file)) { if (ctx.localFS.exists(file)) {
return file; return file;
} }
numDirsSearched++; numDirsSearched++;
@ -459,10 +493,10 @@ private static class PathIterator implements Iterator<Path>, Iterable<Path> {
private final FileSystem fs; private final FileSystem fs;
private final String pathStr; private final String pathStr;
private int i = 0; private int i = 0;
private final String[] rootDirs; private final Path[] rootDirs;
private Path next = null; private Path next = null;
private PathIterator(FileSystem fs, String pathStr, String[] rootDirs) private PathIterator(FileSystem fs, String pathStr, Path[] rootDirs)
throws IOException { throws IOException {
this.fs = fs; this.fs = fs;
this.pathStr = pathStr; this.pathStr = pathStr;
@ -517,21 +551,22 @@ public Iterator<Path> iterator() {
* @return all of the paths that exist under any of the roots * @return all of the paths that exist under any of the roots
* @throws IOException * @throws IOException
*/ */
synchronized Iterable<Path> getAllLocalPathsToRead(String pathStr, Iterable<Path> getAllLocalPathsToRead(String pathStr,
Configuration conf) throws IOException { Configuration conf) throws IOException {
confChanged(conf); Context ctx = confChanged(conf);
if (pathStr.startsWith("/")) { if (pathStr.startsWith("/")) {
pathStr = pathStr.substring(1); pathStr = pathStr.substring(1);
} }
return new PathIterator(localFS, pathStr, localDirs); return new PathIterator(ctx.localFS, pathStr, ctx.localDirs);
} }
/** We search through all the configured dirs for the file's existence /** We search through all the configured dirs for the file's existence
* and return true when we find one * and return true when we find one
*/ */
public synchronized boolean ifExists(String pathStr,Configuration conf) { public boolean ifExists(String pathStr, Configuration conf) {
Context ctx = currentContext.get();
try { try {
int numDirs = localDirs.length; int numDirs = ctx.localDirs.length;
int numDirsSearched = 0; int numDirsSearched = 0;
//remove the leading slash from the path (to make sure that the uri //remove the leading slash from the path (to make sure that the uri
//resolution results in a valid path on the dir being checked) //resolution results in a valid path on the dir being checked)
@ -539,8 +574,8 @@ public synchronized boolean ifExists(String pathStr,Configuration conf) {
pathStr = pathStr.substring(1); pathStr = pathStr.substring(1);
} }
while (numDirsSearched < numDirs) { while (numDirsSearched < numDirs) {
Path file = new Path(localDirs[numDirsSearched], pathStr); Path file = new Path(ctx.localDirs[numDirsSearched], pathStr);
if (localFS.exists(file)) { if (ctx.localFS.exists(file)) {
return true; return true;
} }
numDirsSearched++; numDirsSearched++;