HADOOP-7580. Add a version of getLocalPathForWrite to LocalDirAllocator which doesn't create dirs. Contributed by Chris Douglas & Siddharth Seth.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1165473 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b0632df93a
commit
c2e1756d7a
@ -365,6 +365,9 @@ Release 0.23.0 - Unreleased
|
|||||||
HADOOP-7552. FileUtil#fullyDelete doesn't throw IOE but lists it
|
HADOOP-7552. FileUtil#fullyDelete doesn't throw IOE but lists it
|
||||||
in the throws clause. (eli)
|
in the throws clause. (eli)
|
||||||
|
|
||||||
|
HADOOP-7580. Add a version of getLocalPathForWrite to LocalDirAllocator
|
||||||
|
which doesn't create dirs. (Chris Douglas & Siddharth Seth via acmurthy)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-7333. Performance improvement in PureJavaCrc32. (Eric Caspole
|
HADOOP-7333. Performance improvement in PureJavaCrc32. (Eric Caspole
|
||||||
|
@ -128,8 +128,26 @@ public Path getLocalPathForWrite(String pathStr,
|
|||||||
*/
|
*/
|
||||||
public Path getLocalPathForWrite(String pathStr, long size,
|
public Path getLocalPathForWrite(String pathStr, long size,
|
||||||
Configuration conf) throws IOException {
|
Configuration conf) throws IOException {
|
||||||
|
return getLocalPathForWrite(pathStr, size, conf, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Get a path from the local FS. Pass size as
|
||||||
|
* SIZE_UNKNOWN if not known apriori. We
|
||||||
|
* round-robin over the set of disks (via the configured dirs) and return
|
||||||
|
* the first complete path which has enough space
|
||||||
|
* @param pathStr the requested path (this will be created on the first
|
||||||
|
* available disk)
|
||||||
|
* @param size the size of the file that is going to be written
|
||||||
|
* @param conf the Configuration object
|
||||||
|
* @param checkWrite ensure that the path is writable
|
||||||
|
* @return the complete path to the file on a local disk
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Path getLocalPathForWrite(String pathStr, long size,
|
||||||
|
Configuration conf,
|
||||||
|
boolean checkWrite) throws IOException {
|
||||||
AllocatorPerContext context = obtainContext(contextCfgItemName);
|
AllocatorPerContext context = obtainContext(contextCfgItemName);
|
||||||
return context.getLocalPathForWrite(pathStr, size, conf);
|
return context.getLocalPathForWrite(pathStr, size, conf, checkWrite);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Get a path from the local FS for reading. We search through all the
|
/** Get a path from the local FS for reading. We search through all the
|
||||||
@ -146,6 +164,23 @@ public Path getLocalPathToRead(String pathStr,
|
|||||||
return context.getLocalPathToRead(pathStr, conf);
|
return context.getLocalPathToRead(pathStr, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all of the paths that currently exist in the working directories.
|
||||||
|
* @param pathStr the path underneath the roots
|
||||||
|
* @param conf the configuration to look up the roots in
|
||||||
|
* @return all of the paths that exist under any of the roots
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Iterable<Path> getAllLocalPathsToRead(String pathStr,
|
||||||
|
Configuration conf
|
||||||
|
) throws IOException {
|
||||||
|
AllocatorPerContext context;
|
||||||
|
synchronized (this) {
|
||||||
|
context = obtainContext(contextCfgItemName);
|
||||||
|
}
|
||||||
|
return context.getAllLocalPathsToRead(pathStr, conf);
|
||||||
|
}
|
||||||
|
|
||||||
/** Creates a temporary file in the local FS. Pass size as -1 if not known
|
/** Creates a temporary file in the local FS. Pass size as -1 if not known
|
||||||
* apriori. We round-robin over the set of disks (via the configured dirs)
|
* apriori. We round-robin over the set of disks (via the configured dirs)
|
||||||
* and select the first complete path which has enough space. A file is
|
* and select the first complete path which has enough space. A file is
|
||||||
@ -214,7 +249,8 @@ public AllocatorPerContext(String contextCfgItemName) {
|
|||||||
/** This method gets called everytime before any read/write to make sure
|
/** This method gets called everytime before any read/write to make sure
|
||||||
* that any change to localDirs is reflected immediately.
|
* that any change to localDirs is reflected immediately.
|
||||||
*/
|
*/
|
||||||
private void confChanged(Configuration conf) throws IOException {
|
private synchronized void confChanged(Configuration conf)
|
||||||
|
throws IOException {
|
||||||
String newLocalDirs = conf.get(contextCfgItemName);
|
String newLocalDirs = conf.get(contextCfgItemName);
|
||||||
if (!newLocalDirs.equals(savedLocalDirs)) {
|
if (!newLocalDirs.equals(savedLocalDirs)) {
|
||||||
localDirs = conf.getTrimmedStrings(contextCfgItemName);
|
localDirs = conf.getTrimmedStrings(contextCfgItemName);
|
||||||
@ -251,18 +287,22 @@ private void confChanged(Configuration conf) throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Path createPath(String path) throws IOException {
|
private Path createPath(String path,
|
||||||
|
boolean checkWrite) throws IOException {
|
||||||
Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
|
Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
|
||||||
path);
|
path);
|
||||||
//check whether we are able to create a directory here. If the disk
|
if (checkWrite) {
|
||||||
//happens to be RDONLY we will fail
|
//check whether we are able to create a directory here. If the disk
|
||||||
try {
|
//happens to be RDONLY we will fail
|
||||||
DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
|
try {
|
||||||
return file;
|
DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
|
||||||
} catch (DiskErrorException d) {
|
return file;
|
||||||
LOG.warn("Disk Error Exception: ", d);
|
} catch (DiskErrorException d) {
|
||||||
return null;
|
LOG.warn("Disk Error Exception: ", d);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return file;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -273,17 +313,6 @@ int getCurrentDirectoryIndex() {
|
|||||||
return dirNumLastAccessed;
|
return dirNumLastAccessed;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Get a path from the local FS. This method should be used if the size of
|
|
||||||
* the file is not known a priori.
|
|
||||||
*
|
|
||||||
* It will use roulette selection, picking directories
|
|
||||||
* with probability proportional to their available space.
|
|
||||||
*/
|
|
||||||
public synchronized Path getLocalPathForWrite(String path,
|
|
||||||
Configuration conf) throws IOException {
|
|
||||||
return getLocalPathForWrite(path, SIZE_UNKNOWN, conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Get a path from the local FS. If size is known, we go
|
/** Get a path from the local FS. If size is known, we go
|
||||||
* round-robin over the set of disks (via the configured dirs) and return
|
* round-robin over the set of disks (via the configured dirs) and return
|
||||||
* the first complete path which has enough space.
|
* the first complete path which has enough space.
|
||||||
@ -292,7 +321,7 @@ public synchronized Path getLocalPathForWrite(String path,
|
|||||||
* with probability proportional to their available space.
|
* with probability proportional to their available space.
|
||||||
*/
|
*/
|
||||||
public synchronized Path getLocalPathForWrite(String pathStr, long size,
|
public synchronized Path getLocalPathForWrite(String pathStr, long size,
|
||||||
Configuration conf) throws IOException {
|
Configuration conf, boolean checkWrite) throws IOException {
|
||||||
confChanged(conf);
|
confChanged(conf);
|
||||||
int numDirs = localDirs.length;
|
int numDirs = localDirs.length;
|
||||||
int numDirsSearched = 0;
|
int numDirsSearched = 0;
|
||||||
@ -324,7 +353,7 @@ public synchronized Path getLocalPathForWrite(String pathStr, long size,
|
|||||||
dir++;
|
dir++;
|
||||||
}
|
}
|
||||||
dirNumLastAccessed = dir;
|
dirNumLastAccessed = dir;
|
||||||
returnPath = createPath(pathStr);
|
returnPath = createPath(pathStr, checkWrite);
|
||||||
if (returnPath == null) {
|
if (returnPath == null) {
|
||||||
totalAvailable -= availableOnDisk[dir];
|
totalAvailable -= availableOnDisk[dir];
|
||||||
availableOnDisk[dir] = 0; // skip this disk
|
availableOnDisk[dir] = 0; // skip this disk
|
||||||
@ -335,7 +364,7 @@ public synchronized Path getLocalPathForWrite(String pathStr, long size,
|
|||||||
while (numDirsSearched < numDirs && returnPath == null) {
|
while (numDirsSearched < numDirs && returnPath == null) {
|
||||||
long capacity = dirDF[dirNumLastAccessed].getAvailable();
|
long capacity = dirDF[dirNumLastAccessed].getAvailable();
|
||||||
if (capacity > size) {
|
if (capacity > size) {
|
||||||
returnPath = createPath(pathStr);
|
returnPath = createPath(pathStr, checkWrite);
|
||||||
}
|
}
|
||||||
dirNumLastAccessed++;
|
dirNumLastAccessed++;
|
||||||
dirNumLastAccessed = dirNumLastAccessed % numDirs;
|
dirNumLastAccessed = dirNumLastAccessed % numDirs;
|
||||||
@ -361,7 +390,7 @@ public File createTmpFileForWrite(String pathStr, long size,
|
|||||||
Configuration conf) throws IOException {
|
Configuration conf) throws IOException {
|
||||||
|
|
||||||
// find an appropriate directory
|
// find an appropriate directory
|
||||||
Path path = getLocalPathForWrite(pathStr, size, conf);
|
Path path = getLocalPathForWrite(pathStr, size, conf, true);
|
||||||
File dir = new File(path.getParent().toUri().getPath());
|
File dir = new File(path.getParent().toUri().getPath());
|
||||||
String prefix = path.getName();
|
String prefix = path.getName();
|
||||||
|
|
||||||
@ -398,6 +427,74 @@ public synchronized Path getLocalPathToRead(String pathStr,
|
|||||||
" the configured local directories");
|
" the configured local directories");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class PathIterator implements Iterator<Path>, Iterable<Path> {
|
||||||
|
private final FileSystem fs;
|
||||||
|
private final String pathStr;
|
||||||
|
private int i = 0;
|
||||||
|
private final String[] rootDirs;
|
||||||
|
private Path next = null;
|
||||||
|
|
||||||
|
private PathIterator(FileSystem fs, String pathStr, String[] rootDirs)
|
||||||
|
throws IOException {
|
||||||
|
this.fs = fs;
|
||||||
|
this.pathStr = pathStr;
|
||||||
|
this.rootDirs = rootDirs;
|
||||||
|
advance();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() {
|
||||||
|
return next != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void advance() throws IOException {
|
||||||
|
while (i < rootDirs.length) {
|
||||||
|
next = new Path(rootDirs[i++], pathStr);
|
||||||
|
if (fs.exists(next)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
next = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Path next() {
|
||||||
|
Path result = next;
|
||||||
|
try {
|
||||||
|
advance();
|
||||||
|
} catch (IOException ie) {
|
||||||
|
throw new RuntimeException("Can't check existance of " + next, ie);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
throw new UnsupportedOperationException("read only iterator");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<Path> iterator() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all of the paths that currently exist in the working directories.
|
||||||
|
* @param pathStr the path underneath the roots
|
||||||
|
* @param conf the configuration to look up the roots in
|
||||||
|
* @return all of the paths that exist under any of the roots
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
synchronized Iterable<Path> getAllLocalPathsToRead(String pathStr,
|
||||||
|
Configuration conf) throws IOException {
|
||||||
|
confChanged(conf);
|
||||||
|
if (pathStr.startsWith("/")) {
|
||||||
|
pathStr = pathStr.substring(1);
|
||||||
|
}
|
||||||
|
return new PathIterator(localFS, pathStr, localDirs);
|
||||||
|
}
|
||||||
|
|
||||||
/** We search through all the configured dirs for the file's existence
|
/** We search through all the configured dirs for the file's existence
|
||||||
* and return true when we find one
|
* and return true when we find one
|
||||||
*/
|
*/
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
package org.apache.hadoop.fs;
|
package org.apache.hadoop.fs;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -208,4 +209,33 @@ public void test4() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Two buffer dirs. The first dir does not exist & is on a read-only disk;
|
||||||
|
* The second dir exists & is RW
|
||||||
|
* getLocalPathForWrite with checkAccess set to false should create a parent
|
||||||
|
* directory. With checkAccess true, the directory should not be created.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public void testLocalPathForWriteDirCreation() throws IOException {
|
||||||
|
try {
|
||||||
|
conf.set(CONTEXT, BUFFER_DIR[0] + "," + BUFFER_DIR[1]);
|
||||||
|
assertTrue(localFs.mkdirs(BUFFER_PATH[1]));
|
||||||
|
BUFFER_ROOT.setReadOnly();
|
||||||
|
Path p1 =
|
||||||
|
dirAllocator.getLocalPathForWrite("p1/x", SMALL_FILE_SIZE, conf);
|
||||||
|
assertTrue(localFs.getFileStatus(p1.getParent()).isDirectory());
|
||||||
|
|
||||||
|
Path p2 =
|
||||||
|
dirAllocator.getLocalPathForWrite("p2/x", SMALL_FILE_SIZE, conf,
|
||||||
|
false);
|
||||||
|
try {
|
||||||
|
localFs.getFileStatus(p2.getParent());
|
||||||
|
} catch (Exception e) {
|
||||||
|
assertEquals(e.getClass(), FileNotFoundException.class);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
Shell.execCommand(new String[] { "chmod", "u+w", BUFFER_DIR_ROOT });
|
||||||
|
rmBufferDirs();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user