HDFS-6480. Move waitForReady() from FSDirectory to FSNamesystem. Contributed by Haohui Mai.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1603705 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Haohui Mai 2014-06-19 04:13:56 +00:00
parent eb93f73ea8
commit a4e0ff5e05
9 changed files with 122 additions and 100 deletions

View File

@ -454,6 +454,8 @@ Release 2.5.0 - UNRELEASED
HDFS-6530. Fix Balancer documentation. (szetszwo)
HDFS-6480. Move waitForReady() from FSDirectory to FSNamesystem. (wheat9)
OPTIMIZATIONS
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)

View File

@ -252,7 +252,7 @@ void doCheckpoint() throws IOException {
backupNode.namesystem.writeLock();
try {
backupNode.namesystem.dir.setReady();
backupNode.namesystem.setImageLoaded();
if(backupNode.namesystem.getBlocksTotal() > 0) {
backupNode.namesystem.setBlockTotal();
}

View File

@ -26,11 +26,10 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@ -83,15 +82,14 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/*************************************************
* FSDirectory stores the filesystem directory state.
* It handles writing/loading values to disk, and logging
* changes as we go.
*
* It keeps the filename->blockset mapping always-current
* and logged to disk.
*
*************************************************/
/**
* Both FSDirectory and FSNamesystem manage the state of the namespace.
* FSDirectory is a pure in-memory data structure, all of whose operations
* happen entirely in memory. In contrast, FSNamesystem persists the operations
* to the disk.
* @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem
**/
@InterfaceAudience.Private
public class FSDirectory implements Closeable {
private static INodeDirectorySnapshottable createRoot(FSNamesystem namesystem) {
final INodeDirectory r = new INodeDirectory(
@ -120,7 +118,6 @@ private static INodeDirectorySnapshottable createRoot(FSNamesystem namesystem) {
INodeDirectory rootDir;
FSImage fsImage;
private final FSNamesystem namesystem;
private volatile boolean ready = false;
private volatile boolean skipQuotaCheck = false; //skip while consuming edits
private final int maxComponentLength;
private final int maxDirItems;
@ -132,7 +129,6 @@ private static INodeDirectorySnapshottable createRoot(FSNamesystem namesystem) {
// lock to protect the directory and BlockMap
private final ReentrantReadWriteLock dirLock;
private final Condition cond;
// utility methods to acquire and release read lock and write lock
void readLock() {
@ -175,7 +171,6 @@ public int getWriteHoldCount() {
FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
this.dirLock = new ReentrantReadWriteLock(true); // fair
this.cond = dirLock.writeLock().newCondition();
rootDir = createRoot(ns);
inodeMap = INodeMap.newInstance(rootDir);
this.fsImage = fsImage;
@ -231,38 +226,6 @@ public INodeDirectory getRoot() {
return rootDir;
}
/**
* Notify that loading of this FSDirectory is complete, and
* it is ready for use
*/
void imageLoadComplete() {
Preconditions.checkState(!ready, "FSDirectory already loaded");
setReady();
}
void setReady() {
if(ready) return;
writeLock();
try {
setReady(true);
this.nameCache.initialized();
cond.signalAll();
} finally {
writeUnlock();
}
}
//This is for testing purposes only
@VisibleForTesting
boolean isReady() {
return ready;
}
// exposed for unit tests
protected void setReady(boolean flag) {
ready = flag;
}
/**
* Shutdown the filestore
*/
@ -271,22 +234,12 @@ public void close() throws IOException {
fsImage.close();
}
/**
* Block until the object is ready to be used.
*/
void waitForReady() {
if (!ready) {
writeLock();
try {
while (!ready) {
try {
cond.await(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
}
}
} finally {
writeUnlock();
}
void markNameCacheInitialized() {
writeLock();
try {
nameCache.initialized();
} finally {
writeUnlock();
}
}
@ -312,7 +265,6 @@ INodeFile addFile(String path, PermissionStatus permissions,
String clientMachine, DatanodeDescriptor clientNode)
throws FileAlreadyExistsException, QuotaExceededException,
UnresolvedLinkException, SnapshotAccessControlException, AclException {
waitForReady();
long modTime = now();
INodeFile newNode = new INodeFile(namesystem.allocateNewInodeId(), null,
@ -385,8 +337,6 @@ INodeFile unprotectedAddFile( long id,
*/
BlockInfo addBlock(String path, INodesInPath inodesInPath, Block block,
DatanodeStorageInfo[] targets) throws IOException {
waitForReady();
writeLock();
try {
final INodeFile fileINode = inodesInPath.getLastINode().asFile();
@ -424,8 +374,6 @@ BlockInfo addBlock(String path, INodesInPath inodesInPath, Block block,
boolean removeBlock(String path, INodeFile fileNode, Block block)
throws IOException {
Preconditions.checkArgument(fileNode.isUnderConstruction());
waitForReady();
writeLock();
try {
return unprotectedRemoveBlock(path, fileNode, block);
@ -469,7 +417,6 @@ boolean renameTo(String src, String dst, long mtime)
NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
+src+" to "+dst);
}
waitForReady();
writeLock();
try {
if (!unprotectedRenameTo(src, dst, mtime))
@ -492,7 +439,6 @@ void renameTo(String src, String dst, long mtime,
NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src
+ " to " + dst);
}
waitForReady();
writeLock();
try {
if (unprotectedRenameTo(src, dst, mtime, options)) {
@ -1024,7 +970,6 @@ boolean unprotectedRenameTo(String src, String dst, long timestamp,
Block[] setReplication(String src, short replication, short[] blockRepls)
throws QuotaExceededException, UnresolvedLinkException,
SnapshotAccessControlException {
waitForReady();
writeLock();
try {
return unprotectedSetReplication(src, replication, blockRepls);
@ -1147,7 +1092,6 @@ void concat(String target, String[] srcs, long timestamp)
writeLock();
try {
// actual move
waitForReady();
unprotectedConcat(target, srcs, timestamp);
} finally {
writeUnlock();
@ -1230,7 +1174,6 @@ long delete(String src, BlocksMapUpdateInfo collectedBlocks,
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
}
waitForReady();
final long filesRemoved;
writeLock();
try {
@ -1703,7 +1646,7 @@ private void updateCount(INodesInPath iip, int numOfINodes,
long nsDelta, long dsDelta, boolean checkQuota)
throws QuotaExceededException {
assert hasWriteLock();
if (!ready) {
if (!namesystem.isImageLoaded()) {
//still initializing. do not check or update quotas.
return;
}
@ -1896,7 +1839,7 @@ private static void verifyQuota(INode[] inodes, int pos, long nsDelta,
*/
private void verifyQuotaForRename(INode[] src, INode[] dst)
throws QuotaExceededException {
if (!ready || skipQuotaCheck) {
if (!namesystem.isImageLoaded() || skipQuotaCheck) {
// Do not check quota if edits log is still being processed
return;
}
@ -1952,7 +1895,7 @@ void verifySnapshotName(String snapshotName, String path)
void verifyINodeName(byte[] childName) throws HadoopIllegalArgumentException {
if (Arrays.equals(HdfsConstants.DOT_SNAPSHOT_DIR_BYTES, childName)) {
String s = "\"" + HdfsConstants.DOT_SNAPSHOT_DIR + "\" is a reserved name.";
if (!ready) {
if (!namesystem.isImageLoaded()) {
s += " Please rename it before upgrade.";
}
throw new HadoopIllegalArgumentException(s);
@ -1979,7 +1922,7 @@ private void verifyMaxComponentLength(byte[] childName, Object parentPath,
getFullPathName((INode[])parentPath, pos - 1): (String)parentPath;
final PathComponentTooLongException e = new PathComponentTooLongException(
maxComponentLength, length, p, DFSUtil.bytes2String(childName));
if (ready) {
if (namesystem.isImageLoaded()) {
throw e;
} else {
// Do not throw if edits log is still being processed
@ -2003,7 +1946,7 @@ private void verifyMaxDirItems(INode[] pathComponents, int pos)
if (count >= maxDirItems) {
final MaxDirectoryItemsExceededException e
= new MaxDirectoryItemsExceededException(maxDirItems, count);
if (ready) {
if (namesystem.isImageLoaded()) {
e.setPathName(getFullPathName(pathComponents, pos - 1));
throw e;
} else {
@ -2339,7 +2282,6 @@ private boolean unprotectedSetTimes(INode inode, long mtime,
void reset() {
writeLock();
try {
setReady(false);
rootDir = createRoot(getFSNamesystem());
inodeMap.clear();
addToInodeMap(rootDir);

View File

@ -103,6 +103,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -514,6 +515,59 @@ private void logAuditEvent(boolean succeeded,
private final NNConf nnConf;
private volatile boolean imageLoaded = false;
private final Condition cond;
/**
* Notify that loading of this FSDirectory is complete, and
* it is imageLoaded for use
*/
void imageLoadComplete() {
Preconditions.checkState(!imageLoaded, "FSDirectory already loaded");
setImageLoaded();
}
void setImageLoaded() {
if(imageLoaded) return;
writeLock();
try {
setImageLoaded(true);
dir.markNameCacheInitialized();
cond.signalAll();
} finally {
writeUnlock();
}
}
//This is for testing purposes only
@VisibleForTesting
boolean isImageLoaded() {
return imageLoaded;
}
// exposed for unit tests
protected void setImageLoaded(boolean flag) {
imageLoaded = flag;
}
/**
* Block until the object is imageLoaded to be used.
*/
void waitForLoadingFSImage() {
if (!imageLoaded) {
writeLock();
try {
while (!imageLoaded) {
try {
cond.await(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
}
}
} finally {
writeUnlock();
}
}
}
/**
* Set the last allocated inode id when fsimage or editlog is loaded.
*/
@ -555,6 +609,7 @@ void clear() {
inodeId.setCurrentValue(INodeId.LAST_RESERVED_ID);
snapshotManager.clearSnapshottableDirs();
cacheManager.clear();
setImageLoaded(false);
}
@VisibleForTesting
@ -682,6 +737,7 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
boolean fair = conf.getBoolean("dfs.namenode.fslock.fair", true);
LOG.info("fsLock is fair:" + fair);
fsLock = new FSNamesystemLock(fair);
cond = fsLock.writeLock().newCondition();
try {
resourceRecheckInterval = conf.getLong(
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
@ -921,7 +977,7 @@ private void loadFSImage(StartupOption startOpt) throws IOException {
}
writeUnlock();
}
dir.imageLoadComplete();
imageLoadComplete();
}
private void startSecretManager() {
@ -1840,6 +1896,7 @@ private void concatInt(String target, String [] srcs,
HdfsFileStatus resultingStat = null;
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -2115,6 +2172,7 @@ private boolean setReplicationInt(String src, final short replication)
FSPermissionChecker pc = getPermissionChecker();
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -2242,6 +2300,8 @@ private HdfsFileStatus startFileInt(String src, PermissionStatus permissions,
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
boolean create = flag.contains(CreateFlag.CREATE);
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -2730,6 +2790,7 @@ LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
Block newBlock = null;
long offset;
checkOperation(OperationCategory.WRITE);
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -2952,6 +3013,7 @@ boolean abandonBlock(ExtendedBlock b, long fileId, String src, String holder)
}
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -3050,6 +3112,7 @@ boolean completeFile(String src, String holder,
boolean success = false;
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -3249,6 +3312,7 @@ private boolean renameToInt(String src, String dst, boolean logRetryCache)
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot rename " + src);
waitForLoadingFSImage();
src = FSDirectory.resolvePath(src, srcComponents, dir);
dst = FSDirectory.resolvePath(dst, dstComponents, dir);
checkOperation(OperationCategory.WRITE);
@ -3356,6 +3420,7 @@ private void renameToInternal(FSPermissionChecker pc, String src, String dst,
false);
}
waitForLoadingFSImage();
long mtime = now();
dir.renameTo(src, dst, mtime, options);
getEditLog().logRename(src, dst, mtime, logRetryCache, options);
@ -3429,6 +3494,8 @@ private boolean deleteInternal(String src, boolean recursive,
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
boolean ret = false;
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -3902,6 +3969,8 @@ void fsync(String src, long fileId, String clientName, long lastBlockLength)
NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
checkOperation(OperationCategory.WRITE);
byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -4103,6 +4172,7 @@ private void finalizeINodeFileUnderConstruction(String src,
INodeFile pendingFile, int latestSnapshot) throws IOException,
UnresolvedLinkException {
assert hasWriteLock();
FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
Preconditions.checkArgument(uc != null);
leaseManager.removeLease(uc.getClientName(), src);
@ -4114,6 +4184,7 @@ private void finalizeINodeFileUnderConstruction(String src,
// since we just remove the uc feature from pendingFile
final INodeFile newFile = pendingFile.toCompleteFile(now());
waitForLoadingFSImage();
// close file and persist block allocations for this file
closeFile(src, newFile);
@ -4172,6 +4243,7 @@ void commitBlockSynchronization(ExtendedBlock lastblock,
+ ")");
checkOperation(OperationCategory.WRITE);
String src = "";
waitForLoadingFSImage();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -4517,7 +4589,7 @@ void incrDeletedFileCount(long count) {
*/
private void closeFile(String path, INodeFile file) {
assert hasWriteLock();
dir.waitForReady();
waitForLoadingFSImage();
// file is closed
getEditLog().logCloseFile(path, file);
if (NameNode.stateChangeLog.isDebugEnabled()) {
@ -4541,7 +4613,7 @@ private INodeSymlink addSymlink(String path, String target,
boolean createParent, boolean logRetryCache)
throws UnresolvedLinkException, FileAlreadyExistsException,
QuotaExceededException, SnapshotAccessControlException, AclException {
dir.waitForReady();
waitForLoadingFSImage();
final long modTime = now();
if (createParent) {
@ -5804,7 +5876,7 @@ private void checkPermission(FSPermissionChecker pc,
boolean ignoreEmptyDir, boolean resolveLink)
throws AccessControlException, UnresolvedLinkException {
if (!pc.isSuperUser()) {
dir.waitForReady();
waitForLoadingFSImage();
readLock();
try {
pc.checkPermission(path, dir, doCheckOwner, ancestorAccess,
@ -6271,6 +6343,7 @@ void updatePipeline(String clientName, ExtendedBlock oldBlock,
+ ", newNodes=" + Arrays.asList(newNodes)
+ ", clientName=" + clientName
+ ")");
waitForLoadingFSImage();
writeLock();
boolean success = false;
try {

View File

@ -1064,7 +1064,7 @@ static void doMerge(
} finally {
dstNamesystem.writeUnlock();
}
dstNamesystem.dir.imageLoadComplete();
dstNamesystem.imageLoadComplete();
}
// error simulation code for junit test
CheckpointFaultInjector.getInstance().duringMerge();

View File

@ -32,7 +32,6 @@
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.*;
/**
@ -50,6 +49,7 @@ private FSNamesystem makeNameSystemSpy(Block block, INodeFile file)
final DatanodeStorageInfo[] targets = {};
FSNamesystem namesystem = new FSNamesystem(conf, image);
namesystem.setImageLoaded(true);
FSNamesystem namesystemSpy = spy(namesystem);
BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
block, 1, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);

View File

@ -36,7 +36,6 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
@ -127,16 +126,6 @@ public void testDumpTree() throws Exception {
}
}
@Test
public void testReset() throws Exception {
fsdir.reset();
Assert.assertFalse(fsdir.isReady());
final INodeDirectory root = (INodeDirectory) fsdir.getINode("/");
Assert.assertTrue(root.getChildrenList(Snapshot.CURRENT_STATE_ID).isEmpty());
fsdir.imageLoadComplete();
Assert.assertTrue(fsdir.isReady());
}
@Test
public void testSkipQuotaCheck() throws Exception {
try {

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.junit.After;
import org.junit.Test;
import org.mockito.Mockito;
@ -194,4 +195,22 @@ public void testFSNamesystemLockCompatibility() {
assertFalse(rwLock.isWriteLockedByCurrentThread());
assertEquals(0, rwLock.getWriteHoldCount());
}
@Test
public void testReset() throws Exception {
Configuration conf = new Configuration();
FSEditLog fsEditLog = Mockito.mock(FSEditLog.class);
FSImage fsImage = Mockito.mock(FSImage.class);
Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog);
FSNamesystem fsn = new FSNamesystem(conf, fsImage);
fsn.imageLoadComplete();
assertTrue(fsn.isImageLoaded());
fsn.clear();
assertFalse(fsn.isImageLoaded());
final INodeDirectory root = (INodeDirectory) fsn.getFSDirectory()
.getINode("/");
assertTrue(root.getChildrenList(Snapshot.CURRENT_STATE_ID).isEmpty());
fsn.imageLoadComplete();
assertTrue(fsn.isImageLoaded());
}
}

View File

@ -19,12 +19,9 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import static org.apache.hadoop.util.Time.now;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
@ -57,7 +54,7 @@ static private FSNamesystem getMockNamesystem() throws IOException {
FSEditLog editLog = mock(FSEditLog.class);
doReturn(editLog).when(fsImage).getEditLog();
FSNamesystem fsn = new FSNamesystem(conf, fsImage);
fsn.getFSDirectory().setReady(fsIsReady);
fsn.setImageLoaded(fsIsReady);
return fsn;
}