HDFS-2718. Optimize OP_ADD in edits loading. Contributed by Konstantin Shvachko.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1239760 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1950f97ab9
commit
191db6a907
@ -1665,6 +1665,8 @@ Release 0.22.1 - Unreleased
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-2718. Optimize OP_ADD in edits loading. (shv)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
Release 0.22.0 - 2011-11-29
|
||||
|
@ -157,9 +157,6 @@ public class BlockInfoUnderConstruction extends BlockInfo {
|
||||
BlockInfo convertToCompleteBlock() throws IOException {
|
||||
assert getBlockUCState() != BlockUCState.COMPLETE :
|
||||
"Trying to convert a COMPLETE block";
|
||||
if(getBlockUCState() != BlockUCState.COMMITTED)
|
||||
throw new IOException(
|
||||
"Cannot complete block: block has not been COMMITTED by the client");
|
||||
return new BlockInfo(this);
|
||||
}
|
||||
|
||||
|
@ -438,15 +438,23 @@ public class BlockManager {
|
||||
*/
|
||||
private BlockInfo completeBlock(final INodeFile fileINode,
|
||||
final int blkIndex) throws IOException {
|
||||
return completeBlock(fileINode, blkIndex, false);
|
||||
}
|
||||
|
||||
public BlockInfo completeBlock(final INodeFile fileINode,
|
||||
final int blkIndex, final boolean force) throws IOException {
|
||||
if(blkIndex < 0)
|
||||
return null;
|
||||
BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
|
||||
if(curBlock.isComplete())
|
||||
return curBlock;
|
||||
BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
|
||||
if(ucBlock.numNodes() < minReplication)
|
||||
if(!force && ucBlock.numNodes() < minReplication)
|
||||
throw new IOException("Cannot complete block: " +
|
||||
"block does not satisfy minimal replication requirement.");
|
||||
if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED)
|
||||
throw new IOException(
|
||||
"Cannot complete block: block has not been COMMITTED by the client");
|
||||
BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
|
||||
// replace penultimate block in file
|
||||
fileINode.setBlock(blkIndex, completeBlock);
|
||||
|
@ -267,22 +267,28 @@ public class FSDirectory implements Closeable {
|
||||
short replication,
|
||||
long modificationTime,
|
||||
long atime,
|
||||
long preferredBlockSize)
|
||||
long preferredBlockSize,
|
||||
String clientName,
|
||||
String clientMachine)
|
||||
throws UnresolvedLinkException {
|
||||
INode newNode;
|
||||
long diskspace = UNKNOWN_DISK_SPACE;
|
||||
assert hasWriteLock();
|
||||
if (blocks == null)
|
||||
newNode = new INodeDirectory(permissions, modificationTime);
|
||||
else {
|
||||
else if(blocks.length == 0 || blocks[blocks.length-1].getBlockUCState()
|
||||
== BlockUCState.UNDER_CONSTRUCTION) {
|
||||
newNode = new INodeFileUnderConstruction(
|
||||
permissions, blocks.length, replication,
|
||||
preferredBlockSize, modificationTime, clientName,
|
||||
clientMachine, null);
|
||||
} else {
|
||||
newNode = new INodeFile(permissions, blocks.length, replication,
|
||||
modificationTime, atime, preferredBlockSize);
|
||||
diskspace = ((INodeFile)newNode).diskspaceConsumed(blocks);
|
||||
}
|
||||
writeLock();
|
||||
try {
|
||||
try {
|
||||
newNode = addNode(path, newNode, diskspace);
|
||||
newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
|
||||
if(newNode != null && blocks != null) {
|
||||
int nrBlocks = blocks.length;
|
||||
// Add file->block mapping
|
||||
@ -301,6 +307,74 @@ public class FSDirectory implements Closeable {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Update files in-memory data structures with new block information.
|
||||
* @throws IOException
|
||||
*/
|
||||
void updateFile(INodeFile file,
|
||||
String path,
|
||||
PermissionStatus permissions,
|
||||
BlockInfo[] blocks,
|
||||
short replication,
|
||||
long mtime,
|
||||
long atime,
|
||||
long preferredBlockSize) throws IOException {
|
||||
|
||||
// Update the salient file attributes.
|
||||
file.setAccessTime(atime);
|
||||
file.setModificationTimeForce(mtime);
|
||||
|
||||
// Update its block list
|
||||
BlockInfo[] oldBlocks = file.getBlocks();
|
||||
|
||||
// Are we only updating the last block's gen stamp.
|
||||
boolean isGenStampUpdate = oldBlocks.length == blocks.length;
|
||||
|
||||
// First, update blocks in common
|
||||
BlockInfo oldBlock = null;
|
||||
for (int i = 0; i < oldBlocks.length && i < blocks.length; i++) {
|
||||
oldBlock = oldBlocks[i];
|
||||
Block newBlock = blocks[i];
|
||||
|
||||
boolean isLastBlock = i == oldBlocks.length - 1;
|
||||
if (oldBlock.getBlockId() != newBlock.getBlockId() ||
|
||||
(oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() &&
|
||||
!(isGenStampUpdate && isLastBlock))) {
|
||||
throw new IOException("Mismatched block IDs or generation stamps, " +
|
||||
"attempting to replace block " + oldBlock + " with " + newBlock +
|
||||
" as block # " + i + "/" + blocks.length + " of " + path);
|
||||
}
|
||||
|
||||
oldBlock.setNumBytes(newBlock.getNumBytes());
|
||||
oldBlock.setGenerationStamp(newBlock.getGenerationStamp());
|
||||
}
|
||||
|
||||
if (blocks.length < oldBlocks.length) {
|
||||
// We're removing a block from the file, e.g. abandonBlock(...)
|
||||
if (!file.isUnderConstruction()) {
|
||||
throw new IOException("Trying to remove a block from file " +
|
||||
path + " which is not under construction.");
|
||||
}
|
||||
if (blocks.length != oldBlocks.length - 1) {
|
||||
throw new IOException("Trying to remove more than one block from file "
|
||||
+ path);
|
||||
}
|
||||
unprotectedRemoveBlock(path,
|
||||
(INodeFileUnderConstruction)file, oldBlocks[oldBlocks.length - 1]);
|
||||
} else if (blocks.length > oldBlocks.length) {
|
||||
// We're adding blocks
|
||||
// First complete last old Block
|
||||
getBlockManager().completeBlock(file, oldBlocks.length-1, true);
|
||||
// Add the new blocks
|
||||
for (int i = oldBlocks.length; i < blocks.length; i++) {
|
||||
// addBlock();
|
||||
BlockInfo newBI = blocks[i];
|
||||
getBlockManager().addINode(newBI, file);
|
||||
file.addBlock(newBI);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
INodeDirectory addToParent(byte[] src, INodeDirectory parentINode,
|
||||
INode newNode, boolean propagateModTime) throws UnresolvedLinkException {
|
||||
// NOTE: This does not update space counts for parents
|
||||
@ -422,28 +496,33 @@ public class FSDirectory implements Closeable {
|
||||
|
||||
writeLock();
|
||||
try {
|
||||
// modify file-> block and blocksMap
|
||||
fileNode.removeLastBlock(block);
|
||||
getBlockManager().removeBlockFromMap(block);
|
||||
|
||||
unprotectedRemoveBlock(path, fileNode, block);
|
||||
// write modified block locations to log
|
||||
fsImage.getEditLog().logOpenFile(path, fileNode);
|
||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
|
||||
+path+" with "+block
|
||||
+" block is removed from the file system");
|
||||
}
|
||||
|
||||
// update space consumed
|
||||
INode[] pathINodes = getExistingPathINodes(path);
|
||||
updateCount(pathINodes, pathINodes.length-1, 0,
|
||||
-fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void unprotectedRemoveBlock(String path, INodeFileUnderConstruction fileNode,
|
||||
Block block) throws IOException {
|
||||
// modify file-> block and blocksMap
|
||||
fileNode.removeLastBlock(block);
|
||||
getBlockManager().removeBlockFromMap(block);
|
||||
|
||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
|
||||
+path+" with "+block
|
||||
+" block is removed from the file system");
|
||||
}
|
||||
|
||||
// update space consumed
|
||||
INode[] pathINodes = getExistingPathINodes(path);
|
||||
updateCount(pathINodes, pathINodes.length-1, 0,
|
||||
-fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see #unprotectedRenameTo(String, String, long)
|
||||
* @deprecated Use {@link #renameTo(String, String, Rename...)} instead.
|
||||
|
@ -187,31 +187,53 @@ public class FSEditLogLoader {
|
||||
" clientMachine " + addCloseOp.clientMachine);
|
||||
}
|
||||
|
||||
fsDir.unprotectedDelete(addCloseOp.path, addCloseOp.mtime);
|
||||
// There are four cases here:
|
||||
// 1. OP_ADD to create a new file
|
||||
// 2. OP_ADD to update file blocks
|
||||
// 3. OP_ADD to open file for append
|
||||
// 4. OP_CLOSE to close the file
|
||||
|
||||
// add to the file tree
|
||||
INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
|
||||
addCloseOp.path, permissions,
|
||||
blocks, replication,
|
||||
addCloseOp.mtime, addCloseOp.atime, blockSize);
|
||||
if (addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) {
|
||||
//
|
||||
// Replace current node with a INodeUnderConstruction.
|
||||
// Recreate in-memory lease record.
|
||||
//
|
||||
INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
|
||||
node.getLocalNameBytes(),
|
||||
node.getReplication(),
|
||||
node.getModificationTime(),
|
||||
node.getPreferredBlockSize(),
|
||||
node.getBlocks(),
|
||||
node.getPermissionStatus(),
|
||||
addCloseOp.clientName,
|
||||
addCloseOp.clientMachine,
|
||||
null);
|
||||
fsDir.replaceNode(addCloseOp.path, node, cons);
|
||||
fsNamesys.leaseManager.addLease(cons.getClientName(),
|
||||
addCloseOp.path);
|
||||
// See if the file already exists
|
||||
INodeFile oldFile = fsDir.getFileINode(addCloseOp.path);
|
||||
if (oldFile == null) { // OP_ADD for a new file
|
||||
assert addCloseOp.opCode == FSEditLogOpCodes.OP_ADD :
|
||||
"Expected opcode OP_ADD, but got " + addCloseOp.opCode;
|
||||
fsDir.unprotectedAddFile(
|
||||
addCloseOp.path, permissions, blocks, replication,
|
||||
addCloseOp.mtime, addCloseOp.atime, blockSize,
|
||||
addCloseOp.clientName, addCloseOp.clientMachine);
|
||||
} else {
|
||||
fsDir.updateFile(oldFile,
|
||||
addCloseOp.path, permissions, blocks, replication,
|
||||
addCloseOp.mtime, addCloseOp.atime, blockSize);
|
||||
if(addCloseOp.opCode == FSEditLogOpCodes.OP_CLOSE) { // OP_CLOSE
|
||||
assert oldFile.isUnderConstruction() :
|
||||
"File is not under construction: " + addCloseOp.path;
|
||||
fsNamesys.getBlockManager().completeBlock(
|
||||
oldFile, blocks.length-1, true);
|
||||
INodeFile newFile =
|
||||
((INodeFileUnderConstruction)oldFile).convertToInodeFile();
|
||||
fsDir.replaceNode(addCloseOp.path, oldFile, newFile);
|
||||
} else if(! oldFile.isUnderConstruction()) { // OP_ADD for append
|
||||
INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
|
||||
oldFile.getLocalNameBytes(),
|
||||
oldFile.getReplication(),
|
||||
oldFile.getModificationTime(),
|
||||
oldFile.getPreferredBlockSize(),
|
||||
oldFile.getBlocks(),
|
||||
oldFile.getPermissionStatus(),
|
||||
addCloseOp.clientName,
|
||||
addCloseOp.clientMachine,
|
||||
null);
|
||||
fsDir.replaceNode(addCloseOp.path, oldFile, cons);
|
||||
}
|
||||
}
|
||||
// Update file lease
|
||||
if(addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) {
|
||||
fsNamesys.leaseManager.addLease(addCloseOp.clientName, addCloseOp.path);
|
||||
} else { // Ops.OP_CLOSE
|
||||
fsNamesys.leaseManager.removeLease(
|
||||
((INodeFileUnderConstruction)oldFile).getClientName(), addCloseOp.path);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -41,8 +41,20 @@ public class INodeFileUnderConstruction extends INodeFile {
|
||||
String clientName,
|
||||
String clientMachine,
|
||||
DatanodeDescriptor clientNode) {
|
||||
super(permissions.applyUMask(UMASK), 0, replication, modTime, modTime,
|
||||
preferredBlockSize);
|
||||
this(permissions, 0, replication, preferredBlockSize, modTime,
|
||||
clientName, clientMachine, clientNode);
|
||||
}
|
||||
|
||||
INodeFileUnderConstruction(PermissionStatus permissions,
|
||||
int nrBlocks,
|
||||
short replication,
|
||||
long preferredBlockSize,
|
||||
long modTime,
|
||||
String clientName,
|
||||
String clientMachine,
|
||||
DatanodeDescriptor clientNode) {
|
||||
super(permissions.applyUMask(UMASK), nrBlocks, replication,
|
||||
modTime, modTime, preferredBlockSize);
|
||||
this.clientName = clientName;
|
||||
this.clientMachine = clientMachine;
|
||||
this.clientNode = clientNode;
|
||||
|
@ -72,12 +72,20 @@ public class TestAbandonBlock {
|
||||
|
||||
// Now abandon the last block
|
||||
DFSClient dfsclient = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
|
||||
LocatedBlocks blocks = dfsclient.getNamenode().getBlockLocations(src, 0, 1);
|
||||
LocatedBlocks blocks =
|
||||
dfsclient.getNamenode().getBlockLocations(src, 0, Integer.MAX_VALUE);
|
||||
int orginalNumBlocks = blocks.locatedBlockCount();
|
||||
LocatedBlock b = blocks.getLastLocatedBlock();
|
||||
dfsclient.getNamenode().abandonBlock(b.getBlock(), src, dfsclient.clientName);
|
||||
|
||||
// And close the file
|
||||
fout.close();
|
||||
|
||||
// Close cluster and check the block has been abandoned after restart
|
||||
cluster.restartNameNode();
|
||||
blocks = dfsclient.getNamenode().getBlockLocations(src, 0, Integer.MAX_VALUE);
|
||||
assert orginalNumBlocks == blocks.locatedBlockCount() + 1 :
|
||||
"Blocks " + b + " has not been abandoned.";
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -116,10 +116,12 @@ public class TestEditLog extends TestCase {
|
||||
int numTransactions;
|
||||
short replication = 3;
|
||||
long blockSize = 64;
|
||||
int startIndex;
|
||||
|
||||
Transactions(FSNamesystem ns, int num) {
|
||||
Transactions(FSNamesystem ns, int numTx, int startIdx) {
|
||||
namesystem = ns;
|
||||
numTransactions = num;
|
||||
numTransactions = numTx;
|
||||
startIndex = startIdx;
|
||||
}
|
||||
|
||||
// add a bunch of transactions.
|
||||
@ -131,8 +133,8 @@ public class TestEditLog extends TestCase {
|
||||
for (int i = 0; i < numTransactions; i++) {
|
||||
INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
|
||||
p, replication, blockSize, 0, "", "", null);
|
||||
editLog.logOpenFile("/filename" + i, inode);
|
||||
editLog.logCloseFile("/filename" + i, inode);
|
||||
editLog.logOpenFile("/filename" + startIndex + i, inode);
|
||||
editLog.logCloseFile("/filename" + startIndex + i, inode);
|
||||
editLog.logSync();
|
||||
}
|
||||
}
|
||||
@ -280,7 +282,8 @@ public class TestEditLog extends TestCase {
|
||||
// Create threads and make them run transactions concurrently.
|
||||
Thread threadId[] = new Thread[NUM_THREADS];
|
||||
for (int i = 0; i < NUM_THREADS; i++) {
|
||||
Transactions trans = new Transactions(namesystem, NUM_TRANSACTIONS);
|
||||
Transactions trans =
|
||||
new Transactions(namesystem, NUM_TRANSACTIONS, i*NUM_TRANSACTIONS);
|
||||
threadId[i] = new Thread(trans, "TransactionThread-" + i);
|
||||
threadId[i].start();
|
||||
}
|
||||
@ -293,11 +296,16 @@ public class TestEditLog extends TestCase {
|
||||
i--; // retry
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Reopen some files as for append
|
||||
Transactions trans =
|
||||
new Transactions(namesystem, NUM_TRANSACTIONS, NUM_TRANSACTIONS / 2);
|
||||
trans.run();
|
||||
|
||||
// Roll another time to finalize edits_inprogress_3
|
||||
fsimage.rollEditLog();
|
||||
|
||||
long expectedTxns = (NUM_THREADS * 2 * NUM_TRANSACTIONS) + 2; // +2 for start/end txns
|
||||
long expectedTxns = ((NUM_THREADS+1) * 2 * NUM_TRANSACTIONS) + 2; // +2 for start/end txns
|
||||
|
||||
// Verify that we can read in all the transactions that we have written.
|
||||
// If there were any corruptions, it is likely that the reading in
|
||||
|
Loading…
x
Reference in New Issue
Block a user