HDFS-10768. Optimize mkdir ops. Contributed by Daryn Sharp.
This commit is contained in:
parent
cde3a00526
commit
8b7adf4ddf
@ -32,10 +32,7 @@
|
||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.AbstractMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
|
||||
class FSDirMkdirOp {
|
||||
@ -63,7 +60,6 @@ static HdfsFileStatus mkdirs(FSNamesystem fsn, String src,
|
||||
throw new FileAlreadyExistsException("Path is not a directory: " + src);
|
||||
}
|
||||
|
||||
INodesInPath existing = lastINode != null ? iip : iip.getExistingINodes();
|
||||
if (lastINode == null) {
|
||||
if (fsd.isPermissionEnabled()) {
|
||||
fsd.checkAncestorAccess(pc, iip, FsAction.WRITE);
|
||||
@ -78,26 +74,20 @@ static HdfsFileStatus mkdirs(FSNamesystem fsn, String src,
|
||||
// create multiple inodes.
|
||||
fsn.checkFsObjectLimit();
|
||||
|
||||
List<String> nonExisting = iip.getPath(existing.length(),
|
||||
iip.length() - existing.length());
|
||||
int length = nonExisting.size();
|
||||
if (length > 1) {
|
||||
List<String> ancestors = nonExisting.subList(0, length - 1);
|
||||
// Ensure that the user can traversal the path by adding implicit
|
||||
// u+wx permission to all ancestor directories
|
||||
existing = createChildrenDirectories(fsd, existing, ancestors,
|
||||
addImplicitUwx(permissions, permissions));
|
||||
// u+wx permission to all ancestor directories.
|
||||
INodesInPath existing =
|
||||
createParentDirectories(fsd, iip, permissions, false);
|
||||
if (existing != null) {
|
||||
existing = createSingleDirectory(
|
||||
fsd, existing, iip.getLastLocalName(), permissions);
|
||||
}
|
||||
if (existing == null) {
|
||||
throw new IOException("Failed to create directory: " + src);
|
||||
}
|
||||
iip = existing;
|
||||
}
|
||||
|
||||
if ((existing = createChildrenDirectories(fsd, existing,
|
||||
nonExisting.subList(length - 1, length), permissions)) == null) {
|
||||
throw new IOException("Failed to create directory: " + src);
|
||||
}
|
||||
}
|
||||
return fsd.getAuditFileInfo(existing);
|
||||
return fsd.getAuditFileInfo(iip);
|
||||
} finally {
|
||||
fsd.writeUnlock();
|
||||
}
|
||||
@ -112,35 +102,18 @@ static HdfsFileStatus mkdirs(FSNamesystem fsn, String src,
|
||||
* For example, path="/foo/bar/spam", "/foo" is an existing directory,
|
||||
* "/foo/bar" is not existing yet, the function will create directory bar.
|
||||
*
|
||||
* @return a tuple which contains both the new INodesInPath (with all the
|
||||
* existing and newly created directories) and the last component in the
|
||||
* relative path. Or return null if there are errors.
|
||||
* @return a INodesInPath with all the existing and newly created
|
||||
* ancestor directories created.
|
||||
* Or return null if there are errors.
|
||||
*/
|
||||
static Map.Entry<INodesInPath, String> createAncestorDirectories(
|
||||
static INodesInPath createAncestorDirectories(
|
||||
FSDirectory fsd, INodesInPath iip, PermissionStatus permission)
|
||||
throws IOException {
|
||||
final String last = DFSUtil.bytes2String(iip.getLastLocalName());
|
||||
INodesInPath existing = iip.getExistingINodes();
|
||||
List<String> children = iip.getPath(existing.length(),
|
||||
iip.length() - existing.length());
|
||||
int size = children.size();
|
||||
if (size > 1) { // otherwise all ancestors have been created
|
||||
List<String> directories = children.subList(0, size - 1);
|
||||
INode parentINode = existing.getLastINode();
|
||||
// Ensure that the user can traversal the path by adding implicit
|
||||
// u+wx permission to all ancestor directories
|
||||
existing = createChildrenDirectories(fsd, existing, directories,
|
||||
addImplicitUwx(parentINode.getPermissionStatus(), permission));
|
||||
if (existing == null) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return new AbstractMap.SimpleImmutableEntry<>(existing, last);
|
||||
return createParentDirectories(fsd, iip, permission, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the directory {@code parent} / {@code children} and all ancestors
|
||||
* along the path.
|
||||
* Create all ancestor directories and return the parent inodes.
|
||||
*
|
||||
* @param fsd FSDirectory
|
||||
* @param existing The INodesInPath instance containing all the existing
|
||||
@ -149,21 +122,35 @@ static Map.Entry<INodesInPath, String> createAncestorDirectories(
|
||||
* starting with "/"
|
||||
* @param perm the permission of the directory. Note that all ancestors
|
||||
* created along the path has implicit {@code u+wx} permissions.
|
||||
* @param inheritPerms if the ancestor directories should inherit permissions
|
||||
* or use the specified permissions.
|
||||
*
|
||||
* @return {@link INodesInPath} which contains all inodes to the
|
||||
* target directory, After the execution parentPath points to the path of
|
||||
* the returned INodesInPath. The function return null if the operation has
|
||||
* failed.
|
||||
*/
|
||||
private static INodesInPath createChildrenDirectories(FSDirectory fsd,
|
||||
INodesInPath existing, List<String> children, PermissionStatus perm)
|
||||
private static INodesInPath createParentDirectories(FSDirectory fsd,
|
||||
INodesInPath iip, PermissionStatus perm, boolean inheritPerms)
|
||||
throws IOException {
|
||||
assert fsd.hasWriteLock();
|
||||
|
||||
for (String component : children) {
|
||||
// this is the desired parent iip if the subsequent delta is 1.
|
||||
INodesInPath existing = iip.getExistingINodes();
|
||||
int missing = iip.length() - existing.length();
|
||||
if (missing == 0) { // full path exists, return parents.
|
||||
existing = iip.getParentINodesInPath();
|
||||
} else if (missing > 1) { // need to create at least one ancestor dir.
|
||||
// Ensure that the user can traversal the path by adding implicit
|
||||
// u+wx permission to all ancestor directories.
|
||||
PermissionStatus basePerm = inheritPerms
|
||||
? existing.getLastINode().getPermissionStatus()
|
||||
: perm;
|
||||
perm = addImplicitUwx(basePerm, perm);
|
||||
// create all the missing directories.
|
||||
final int last = iip.length() - 2;
|
||||
for (int i = existing.length(); existing != null && i <= last; i++) {
|
||||
byte[] component = iip.getPathComponent(i);
|
||||
existing = createSingleDirectory(fsd, existing, component, perm);
|
||||
if (existing == null) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return existing;
|
||||
@ -183,11 +170,11 @@ static void mkdirForEditLog(FSDirectory fsd, long inodeId, String src,
|
||||
}
|
||||
|
||||
private static INodesInPath createSingleDirectory(FSDirectory fsd,
|
||||
INodesInPath existing, String localName, PermissionStatus perm)
|
||||
INodesInPath existing, byte[] localName, PermissionStatus perm)
|
||||
throws IOException {
|
||||
assert fsd.hasWriteLock();
|
||||
existing = unprotectedMkdir(fsd, fsd.allocateNewInodeId(), existing,
|
||||
DFSUtil.string2Bytes(localName), perm, null, now());
|
||||
localName, perm, null, now());
|
||||
if (existing == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -27,7 +27,6 @@
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
|
||||
@ -99,21 +98,21 @@ private static INodeSymlink addSymlink(FSDirectory fsd, String path,
|
||||
INodesInPath iip, String target, PermissionStatus dirPerms,
|
||||
boolean createParent, boolean logRetryCache) throws IOException {
|
||||
final long mtime = now();
|
||||
final byte[] localName = iip.getLastLocalName();
|
||||
final INodesInPath parent;
|
||||
if (createParent) {
|
||||
Map.Entry<INodesInPath, String> e = FSDirMkdirOp
|
||||
.createAncestorDirectories(fsd, iip, dirPerms);
|
||||
if (e == null) {
|
||||
parent = FSDirMkdirOp.createAncestorDirectories(fsd, iip, dirPerms);
|
||||
if (parent == null) {
|
||||
return null;
|
||||
}
|
||||
iip = INodesInPath.append(e.getKey(), null, localName);
|
||||
} else {
|
||||
parent = iip.getParentINodesInPath();
|
||||
}
|
||||
final String userName = dirPerms.getUserName();
|
||||
long id = fsd.allocateNewInodeId();
|
||||
PermissionStatus perm = new PermissionStatus(
|
||||
userName, null, FsPermission.getDefault());
|
||||
INodeSymlink newNode = unprotectedAddSymlink(fsd, iip.getExistingINodes(),
|
||||
localName, id, target, mtime, mtime, perm);
|
||||
INodeSymlink newNode = unprotectedAddSymlink(fsd, parent,
|
||||
iip.getLastLocalName(), id, target, mtime, mtime, perm);
|
||||
if (newNode == null) {
|
||||
NameNode.stateChangeLog.info("addSymlink: failed to add " + path);
|
||||
return null;
|
||||
|
@ -65,7 +65,6 @@
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
|
||||
@ -419,10 +418,10 @@ static HdfsFileStatus startFile(
|
||||
}
|
||||
fsn.checkFsObjectLimit();
|
||||
INodeFile newNode = null;
|
||||
Map.Entry<INodesInPath, String> parent = FSDirMkdirOp
|
||||
.createAncestorDirectories(fsd, iip, permissions);
|
||||
INodesInPath parent =
|
||||
FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions);
|
||||
if (parent != null) {
|
||||
iip = addFile(fsd, parent.getKey(), parent.getValue(), permissions,
|
||||
iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
|
||||
replication, blockSize, holder, clientMachine);
|
||||
newNode = iip != null ? iip.getLastINode().asFile() : null;
|
||||
}
|
||||
@ -572,7 +571,7 @@ private static BlockInfo addBlock(FSDirectory fsd, String path,
|
||||
* @return the new INodesInPath instance that contains the new INode
|
||||
*/
|
||||
private static INodesInPath addFile(
|
||||
FSDirectory fsd, INodesInPath existing, String localName,
|
||||
FSDirectory fsd, INodesInPath existing, byte[] localName,
|
||||
PermissionStatus permissions, short replication, long preferredBlockSize,
|
||||
String clientName, String clientMachine)
|
||||
throws IOException {
|
||||
@ -589,7 +588,7 @@ private static INodesInPath addFile(
|
||||
}
|
||||
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
|
||||
modTime, modTime, replication, preferredBlockSize, ecPolicy != null);
|
||||
newNode.setLocalName(DFSUtil.string2Bytes(localName));
|
||||
newNode.setLocalName(localName);
|
||||
newNode.toUnderConstruction(clientName, clientMachine);
|
||||
newiip = fsd.addINode(existing, newNode);
|
||||
} finally {
|
||||
@ -597,12 +596,13 @@ private static INodesInPath addFile(
|
||||
}
|
||||
if (newiip == null) {
|
||||
NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
|
||||
existing.getPath() + "/" + localName);
|
||||
existing.getPath() + "/" + DFSUtil.bytes2String(localName));
|
||||
return null;
|
||||
}
|
||||
|
||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("DIR* addFile: " + localName + " is added");
|
||||
NameNode.stateChangeLog.debug("DIR* addFile: " +
|
||||
DFSUtil.bytes2String(localName) + " is added");
|
||||
}
|
||||
return newiip;
|
||||
}
|
||||
|
@ -22,7 +22,6 @@
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -361,6 +360,10 @@ public byte[][] getPathComponents() {
|
||||
return path;
|
||||
}
|
||||
|
||||
public byte[] getPathComponent(int i) {
|
||||
return path[i];
|
||||
}
|
||||
|
||||
/** @return the full path in string form */
|
||||
public String getPath() {
|
||||
return DFSUtil.byteArray2PathString(path);
|
||||
@ -374,21 +377,6 @@ public String getPath(int pos) {
|
||||
return DFSUtil.byteArray2PathString(path, 0, pos + 1); // it's a length...
|
||||
}
|
||||
|
||||
/**
|
||||
* @param offset start endpoint (inclusive)
|
||||
* @param length number of path components
|
||||
* @return sub-list of the path
|
||||
*/
|
||||
public List<String> getPath(int offset, int length) {
|
||||
Preconditions.checkArgument(offset >= 0 && length >= 0 && offset + length
|
||||
<= path.length);
|
||||
ImmutableList.Builder<String> components = ImmutableList.builder();
|
||||
for (int i = offset; i < offset + length; i++) {
|
||||
components.add(DFSUtil.bytes2String(path[i]));
|
||||
}
|
||||
return components.build();
|
||||
}
|
||||
|
||||
public int length() {
|
||||
return inodes.length;
|
||||
}
|
||||
@ -429,22 +417,17 @@ public INodesInPath getParentINodesInPath() {
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a new INodesInPath instance that only contains exisitng INodes.
|
||||
* @return a new INodesInPath instance that only contains existing INodes.
|
||||
* Note that this method only handles non-snapshot paths.
|
||||
*/
|
||||
public INodesInPath getExistingINodes() {
|
||||
Preconditions.checkState(!isSnapshot());
|
||||
int i = 0;
|
||||
for (; i < inodes.length; i++) {
|
||||
if (inodes[i] == null) {
|
||||
break;
|
||||
for (int i = inodes.length; i > 0; i--) {
|
||||
if (inodes[i - 1] != null) {
|
||||
return (i == inodes.length) ? this : getAncestorINodesInPath(i);
|
||||
}
|
||||
}
|
||||
INode[] existing = new INode[i];
|
||||
byte[][] existingPath = new byte[i][];
|
||||
System.arraycopy(inodes, 0, existing, 0, i);
|
||||
System.arraycopy(path, 0, existingPath, 0, i);
|
||||
return new INodesInPath(existing, existingPath, isRaw, false, snapshotId);
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -146,6 +146,11 @@ public void testNonSnapshotPathINodes() throws Exception {
|
||||
// The returned nodesInPath should be non-snapshot
|
||||
assertSnapshot(nodesInPath, false, null, -1);
|
||||
|
||||
// verify components are correct
|
||||
for (int i=0; i < components.length; i++) {
|
||||
assertEquals(components[i], nodesInPath.getPathComponent(i));
|
||||
}
|
||||
|
||||
// The last INode should be associated with file1
|
||||
assertTrue("file1=" + file1 + ", nodesInPath=" + nodesInPath,
|
||||
nodesInPath.getINode(components.length - 1) != null);
|
||||
|
Loading…
Reference in New Issue
Block a user