HDFS-2602. NN should log newly-allocated blocks without losing BlockInfo. Contributed by Aaron T. Myers

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1215036 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2011-12-16 04:18:58 +00:00
parent 116bf57bd6
commit 71071b904d
12 changed files with 574 additions and 145 deletions

View File

@ -57,3 +57,5 @@ HDFS-2680. DFSClient should construct failover proxy with exponential backoff (t
HDFS-2683. Authority-based lookup of proxy provider fails if path becomes canonicalized (todd)
HDFS-2689. HA: BookKeeperEditLogInputStream doesn't implement isInProgress() (atm)
HDFS-2602. NN should log newly-allocated blocks without losing BlockInfo (atm)

View File

@ -125,6 +125,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_WEBHDFS_ENABLED_DEFAULT = false;
public static final String DFS_PERMISSIONS_ENABLED_KEY = "dfs.permissions.enabled";
public static final boolean DFS_PERMISSIONS_ENABLED_DEFAULT = true;
public static final String DFS_PERSIST_BLOCKS_KEY = "dfs.persist.blocks";
public static final boolean DFS_PERSIST_BLOCKS_DEFAULT = false;
public static final String DFS_PERMISSIONS_SUPERUSERGROUP_KEY = "dfs.permissions.superusergroup";
public static final String DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT = "supergroup";
public static final String DFS_ADMIN = "dfs.cluster.administrators";

View File

@ -44,6 +44,16 @@ public class HAUtil {
return nnMap != null && nnMap.size() > 1;
}
/**
* Returns true if HA is using a shared edits directory.
*
* @param conf Configuration
* @return true if HA config is using a shared edits dir, false otherwise.
*/
public static boolean usesSharedEditsDir(Configuration conf) {
return null != conf.get(DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
}
/**
* Get the namenode Id by matching the {@code addressKey}
* with the the address of the local node.

View File

@ -425,7 +425,7 @@ public class BlockManager {
final boolean b = commitBlock((BlockInfoUnderConstruction)lastBlock, commitBlock);
if(countNodes(lastBlock).liveReplicas() >= minReplication)
completeBlock(fileINode,fileINode.numBlocks()-1);
completeBlock(fileINode,fileINode.numBlocks()-1, false);
return b;
}
@ -437,14 +437,14 @@ public class BlockManager {
* of replicas reported from data-nodes.
*/
private BlockInfo completeBlock(final INodeFile fileINode,
final int blkIndex) throws IOException {
final int blkIndex, boolean force) throws IOException {
if(blkIndex < 0)
return null;
BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
if(curBlock.isComplete())
return curBlock;
BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
if(ucBlock.numNodes() < minReplication)
if (!force && ucBlock.numNodes() < minReplication)
throw new IOException("Cannot complete block: " +
"block does not satisfy minimal replication requirement.");
BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
@ -455,15 +455,27 @@ public class BlockManager {
}
private BlockInfo completeBlock(final INodeFile fileINode,
final BlockInfo block) throws IOException {
final BlockInfo block, boolean force) throws IOException {
BlockInfo[] fileBlocks = fileINode.getBlocks();
for(int idx = 0; idx < fileBlocks.length; idx++)
if(fileBlocks[idx] == block) {
return completeBlock(fileINode, idx);
return completeBlock(fileINode, idx, force);
}
return block;
}
/**
* Force the given block in the given file to be marked as complete,
* regardless of whether enough replicas are present. This is necessary
* when tailing edit logs as a Standby.
*/
public BlockInfo forceCompleteBlock(final INodeFile fileINode,
final BlockInfoUnderConstruction block) throws IOException {
block.commitBlock(block);
return completeBlock(fileINode, block, true);
}
/**
* Convert the last block of the file to an under construction block.<p>
* The block is converted only if the file has blocks and the last one
@ -590,8 +602,8 @@ public class BlockManager {
final boolean isCorrupt = numCorruptNodes == numNodes;
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
final DatanodeDescriptor[] machines = new DatanodeDescriptor[numMachines];
int j = 0;
if (numMachines > 0) {
int j = 0;
for(Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(blk);
it.hasNext();) {
final DatanodeDescriptor d = it.next();
@ -600,6 +612,12 @@ public class BlockManager {
machines[j++] = d;
}
}
assert j == machines.length :
"isCorrupt: " + isCorrupt +
" numMachines: " + numMachines +
" numNodes: " + numNodes +
" numCorrupt: " + numCorruptNodes +
" numCorruptRepls: " + numCorruptReplicas;
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return new LocatedBlock(eb, machines, pos, isCorrupt);
}
@ -1608,7 +1626,7 @@ public class BlockManager {
int numCurrentReplica = countLiveNodes(storedBlock);
if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED
&& numCurrentReplica >= minReplication)
storedBlock = completeBlock(storedBlock.getINode(), storedBlock);
storedBlock = completeBlock(storedBlock.getINode(), storedBlock, false);
// check whether safe replication is reached for the block
// only complete blocks are counted towards that
@ -1673,7 +1691,7 @@ public class BlockManager {
if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED &&
numLiveReplicas >= minReplication)
storedBlock = completeBlock(fileINode, storedBlock);
storedBlock = completeBlock(fileINode, storedBlock, false);
// check whether safe replication is reached for the block
// only complete blocks are counted towards that

View File

@ -263,34 +263,19 @@ public class FSDirectory implements Closeable {
*/
INode unprotectedAddFile( String path,
PermissionStatus permissions,
BlockInfo[] blocks,
short replication,
long modificationTime,
long atime,
long preferredBlockSize)
throws UnresolvedLinkException {
INode newNode;
long diskspace = UNKNOWN_DISK_SPACE;
assert hasWriteLock();
if (blocks == null)
newNode = new INodeDirectory(permissions, modificationTime);
else {
newNode = new INodeFile(permissions, blocks.length, replication,
modificationTime, atime, preferredBlockSize);
diskspace = ((INodeFile)newNode).diskspaceConsumed(blocks);
}
newNode = new INodeFile(permissions, new BlockInfo[0], replication,
modificationTime, atime, preferredBlockSize);
writeLock();
try {
try {
newNode = addNode(path, newNode, diskspace);
if(newNode != null && blocks != null) {
int nrBlocks = blocks.length;
// Add file->block mapping
INodeFile newF = (INodeFile)newNode;
for (int i = 0; i < nrBlocks; i++) {
newF.setBlock(i, getBlockManager().addINode(blocks[i], newF));
}
}
newNode = addNode(path, newNode, 0);
} catch (IOException e) {
return null;
}
@ -391,7 +376,7 @@ public class FSDirectory implements Closeable {
writeUnlock();
}
}
/**
* Close file.
*/
@ -414,7 +399,7 @@ public class FSDirectory implements Closeable {
}
/**
* Remove a block to the file.
* Remove a block from the file.
*/
boolean removeBlock(String path, INodeFileUnderConstruction fileNode,
Block block) throws IOException {
@ -422,27 +407,32 @@ public class FSDirectory implements Closeable {
writeLock();
try {
// modify file-> block and blocksMap
fileNode.removeLastBlock(block);
getBlockManager().removeBlockFromMap(block);
unprotectedRemoveBlock(path, fileNode, block);
// write modified block locations to log
fsImage.getEditLog().logOpenFile(path, fileNode);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
+path+" with "+block
+" block is removed from the file system");
}
// update space consumed
INode[] pathINodes = getExistingPathINodes(path);
updateCount(pathINodes, pathINodes.length-1, 0,
-fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
} finally {
writeUnlock();
}
return true;
}
void unprotectedRemoveBlock(String path,
INodeFileUnderConstruction fileNode, Block block) throws IOException {
// modify file-> block and blocksMap
fileNode.removeLastBlock(block);
getBlockManager().removeBlockFromMap(block);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
+path+" with "+block
+" block is removed from the file system");
}
// update space consumed
INode[] pathINodes = getExistingPathINodes(path);
updateCount(pathINodes, pathINodes.length - 1, 0,
- fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
}
/**
* @see #unprotectedRenameTo(String, String, long)

View File

@ -28,6 +28,7 @@ import java.util.EnumMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
@ -57,6 +58,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.util.Holder;
import com.google.common.base.Joiner;
@InterfaceAudience.Private
@ -137,82 +139,84 @@ public class FSEditLogLoader {
numEdits++;
incrOpCount(op.opCode, opCounts);
switch (op.opCode) {
case OP_ADD:
case OP_CLOSE: {
case OP_ADD: {
AddCloseOp addCloseOp = (AddCloseOp)op;
// versions > 0 support per file replication
// get name and replication
final short replication = fsNamesys.getBlockManager(
).adjustReplication(addCloseOp.replication);
long blockSize = addCloseOp.blockSize;
BlockInfo blocks[] = new BlockInfo[addCloseOp.blocks.length];
for (int i = 0; i < addCloseOp.blocks.length; i++) {
if(addCloseOp.opCode == FSEditLogOpCodes.OP_ADD
&& i == addCloseOp.blocks.length-1) {
blocks[i] = new BlockInfoUnderConstruction(addCloseOp.blocks[i],
replication);
} else {
blocks[i] = new BlockInfo(addCloseOp.blocks[i], replication);
// See if the file already exists (persistBlocks call)
INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
if (oldFile == null) { // this is OP_ADD on a new file
// versions > 0 support per file replication
// get name and replication
final short replication = fsNamesys.getBlockManager(
).adjustReplication(addCloseOp.replication);
PermissionStatus permissions = fsNamesys.getUpgradePermission();
if (addCloseOp.permissions != null) {
permissions = addCloseOp.permissions;
}
}
PermissionStatus permissions = fsNamesys.getUpgradePermission();
if (addCloseOp.permissions != null) {
permissions = addCloseOp.permissions;
}
// Older versions of HDFS does not store the block size in inode.
// If the file has more than one block, use the size of the
// first block as the blocksize. Otherwise use the default
// block size.
if (-8 <= logVersion && blockSize == 0) {
if (blocks.length > 1) {
blockSize = blocks[0].getNumBytes();
} else {
long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 0);
blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
long blockSize = addCloseOp.blockSize;
if (FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
" numblocks : " + addCloseOp.blocks.length +
" clientHolder " + addCloseOp.clientName +
" clientMachine " + addCloseOp.clientMachine);
}
// Older versions of HDFS does not store the block size in inode.
// If the file has more than one block, use the size of the
// first block as the blocksize. Otherwise use the default
// block size.
if (-8 <= logVersion && blockSize == 0) {
if (addCloseOp.blocks.length > 1) {
blockSize = addCloseOp.blocks[0].getNumBytes();
} else {
long first = ((addCloseOp.blocks.length == 1)?
addCloseOp.blocks[0].getNumBytes(): 0);
blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
}
}
// TODO: We should do away with this add-then-replace dance.
// add to the file tree
INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
addCloseOp.path, permissions,
replication, addCloseOp.mtime,
addCloseOp.atime, blockSize);
fsNamesys.prepareFileForWrite(addCloseOp.path, node,
addCloseOp.clientName, addCloseOp.clientMachine, null);
} else { // This is OP_ADD on an existing file
if (!oldFile.isUnderConstruction()) {
// This is a call to append() on an already-closed file.
fsNamesys.prepareFileForWrite(addCloseOp.path, oldFile,
addCloseOp.clientName, addCloseOp.clientMachine, null);
oldFile = getINodeFile(fsDir, addCloseOp.path);
}
updateBlocks(fsDir, addCloseOp, oldFile);
}
// The open lease transaction re-creates a file if necessary.
// Delete the file if it already exists.
if (FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
" numblocks : " + blocks.length +
" clientHolder " + addCloseOp.clientName +
" clientMachine " + addCloseOp.clientMachine);
break;
}
case OP_CLOSE: {
AddCloseOp addCloseOp = (AddCloseOp)op;
INodeFile oldFile = getINodeFile(fsDir, addCloseOp.path);
if (oldFile == null) {
throw new IOException("Operation trying to close non-existent file " +
addCloseOp.path);
}
// Update in-memory data structures
updateBlocks(fsDir, addCloseOp, oldFile);
fsDir.unprotectedDelete(addCloseOp.path, addCloseOp.mtime);
// add to the file tree
INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
addCloseOp.path, permissions,
blocks, replication,
addCloseOp.mtime, addCloseOp.atime, blockSize);
if (addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) {
//
// Replace current node with a INodeUnderConstruction.
// Recreate in-memory lease record.
//
INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
node.getLocalNameBytes(),
node.getReplication(),
node.getModificationTime(),
node.getPreferredBlockSize(),
node.getBlocks(),
node.getPermissionStatus(),
addCloseOp.clientName,
addCloseOp.clientMachine,
null);
fsDir.replaceNode(addCloseOp.path, node, cons);
fsNamesys.leaseManager.addLease(cons.getClientName(),
addCloseOp.path);
}
// Now close the file
INodeFileUnderConstruction ucFile = (INodeFileUnderConstruction) oldFile;
// TODO: we could use removeLease(holder, path) here, but OP_CLOSE
// doesn't seem to serialize the holder... unclear why!
fsNamesys.leaseManager.removeLeaseWithPrefixPath(addCloseOp.path);
INodeFile newFile = ucFile.convertToInodeFile();
fsDir.replaceNode(addCloseOp.path, ucFile, newFile);
break;
}
case OP_SET_REPLICATION: {
@ -404,7 +408,88 @@ public class FSEditLogLoader {
}
return numEdits;
}
private static INodeFile getINodeFile(FSDirectory fsDir, String path)
throws IOException {
INode inode = fsDir.getINode(path);
if (inode != null) {
if (!(inode instanceof INodeFile)) {
throw new IOException("Operation trying to get non-file " + path);
}
}
return (INodeFile)inode;
}
/**
* Update in-memory data structures with new block information.
* @throws IOException
*/
private void updateBlocks(FSDirectory fsDir, AddCloseOp addCloseOp,
INodeFile file) throws IOException {
// Update the salient file attributes.
file.setAccessTime(addCloseOp.atime);
file.setModificationTimeForce(addCloseOp.mtime);
// Update its block list
BlockInfo[] oldBlocks = file.getBlocks();
// Are we only updating the last block's gen stamp.
boolean isGenStampUpdate = oldBlocks.length == addCloseOp.blocks.length;
// First, update blocks in common
for (int i = 0; i < oldBlocks.length && i < addCloseOp.blocks.length; i++) {
BlockInfo oldBlock = oldBlocks[i];
Block newBlock = addCloseOp.blocks[i];
boolean isLastBlock = i == oldBlocks.length - 1;
if (oldBlock.getBlockId() != newBlock.getBlockId() ||
(oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() &&
!(isGenStampUpdate && isLastBlock))) {
throw new IOException("Mismatched block IDs or generation stamps, " +
"attempting to replace block " + oldBlock + " with " + newBlock +
" as block # " + i + "/" + addCloseOp.blocks.length + " of " +
addCloseOp.path);
}
oldBlock.setNumBytes(newBlock.getNumBytes());
oldBlock.setGenerationStamp(newBlock.getGenerationStamp());
if (oldBlock instanceof BlockInfoUnderConstruction &&
(!isLastBlock || addCloseOp.opCode == FSEditLogOpCodes.OP_CLOSE)) {
fsNamesys.getBlockManager().forceCompleteBlock(
(INodeFileUnderConstruction)file,
(BlockInfoUnderConstruction)oldBlock);
}
}
if (addCloseOp.blocks.length < oldBlocks.length) {
// We're removing a block from the file, e.g. abandonBlock(...)
if (!file.isUnderConstruction()) {
throw new IOException("Trying to remove a block from file " +
addCloseOp.path + " which is not under construction.");
}
if (addCloseOp.blocks.length != oldBlocks.length - 1) {
throw new IOException("Trying to remove more than one block from file "
+ addCloseOp.path);
}
fsDir.unprotectedRemoveBlock(addCloseOp.path,
(INodeFileUnderConstruction)file, oldBlocks[oldBlocks.length - 1]);
} else if (addCloseOp.blocks.length > oldBlocks.length) {
// We're adding blocks
for (int i = oldBlocks.length; i < addCloseOp.blocks.length; i++) {
Block newBlock = addCloseOp.blocks[i];
BlockInfo newBI = new BlockInfoUnderConstruction(newBlock, file.getReplication());
fsNamesys.getBlockManager().addINode(newBI, file);
file.addBlock(newBI);
}
}
if (addCloseOp.blocks.length > 0) {
fsNamesys.notifyGenStampUpdate(
addCloseOp.blocks[addCloseOp.blocks.length - 1].getGenerationStamp());
}
}
private static void dumpOpCounts(
EnumMap<FSEditLogOpCodes, Holder<Integer>> opCounts) {

View File

@ -52,6 +52,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSI
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERSIST_BLOCKS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
@ -203,7 +205,7 @@ import com.google.common.base.Preconditions;
@Metrics(context="dfs")
public class FSNamesystem implements Namesystem, FSClusterStats,
FSNamesystemMBean, NameNodeMXBean {
static final Log LOG = LogFactory.getLog(FSNamesystem.class);
public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
private static final ThreadLocal<StringBuilder> auditBuffer =
new ThreadLocal<StringBuilder>() {
@ -252,6 +254,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
static final int DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED = 100;
static int BLOCK_DELETION_INCREMENT = 1000;
private boolean isPermissionEnabled;
private boolean persistBlocks;
private UserGroupInformation fsOwner;
private String supergroup;
private PermissionStatus defaultPermission;
@ -669,6 +672,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
DFS_PERMISSIONS_ENABLED_DEFAULT);
LOG.info("supergroup=" + supergroup);
LOG.info("isPermissionEnabled=" + isPermissionEnabled);
this.persistBlocks = conf.getBoolean(DFS_PERSIST_BLOCKS_KEY,
DFS_PERSIST_BLOCKS_DEFAULT);
// block allocation has to be persisted in HA using a shared edits directory
// so that the standby has up-to-date namespace information
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
this.persistBlocks |= HAUtil.isHAEnabled(conf, nameserviceId) &&
HAUtil.usesSharedEditsDir(conf);
short filePermission = (short)conf.getInt(DFS_NAMENODE_UPGRADE_PERMISSION_KEY,
DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT);
this.defaultPermission = PermissionStatus.createImmutable(
@ -1403,26 +1415,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
blockManager.getDatanodeManager().getDatanodeByHost(clientMachine);
if (append && myFile != null) {
//
// Replace current node with a INodeUnderConstruction.
// Recreate in-memory lease record.
//
INodeFile node = (INodeFile) myFile;
INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
node.getLocalNameBytes(),
node.getReplication(),
node.getModificationTime(),
node.getPreferredBlockSize(),
node.getBlocks(),
node.getPermissionStatus(),
holder,
clientMachine,
clientNode);
dir.replaceNode(src, node, cons);
leaseManager.addLease(cons.getClientName(), src);
// convert last block to under-construction
return blockManager.convertLastBlockToUnderConstruction(cons);
return prepareFileForWrite(src, myFile, holder, clientMachine, clientNode);
} else {
// Now we can add the name to the filesystem. This file has no
// blocks associated with it.
@ -1450,6 +1443,39 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
return null;
}
/**
* Replace current node with a INodeUnderConstruction.
* Recreate in-memory lease record.
*
* @param src path to the file
* @param file existing file object
* @param leaseHolder identifier of the lease holder on this file
* @param clientMachine identifier of the client machine
* @param clientNode if the client is collocated with a DN, that DN's descriptor
* @return the last block locations if the block is partial or null otherwise
* @throws UnresolvedLinkException
* @throws IOException
*/
public LocatedBlock prepareFileForWrite(String src, INode file,
String leaseHolder, String clientMachine, DatanodeDescriptor clientNode)
throws UnresolvedLinkException, IOException {
INodeFile node = (INodeFile) file;
INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
node.getLocalNameBytes(),
node.getReplication(),
node.getModificationTime(),
node.getPreferredBlockSize(),
node.getBlocks(),
node.getPermissionStatus(),
leaseHolder,
clientMachine,
clientNode);
dir.replaceNode(src, node, cons);
leaseManager.addLease(cons.getClientName(), src);
return blockManager.convertLastBlockToUnderConstruction(cons);
}
/**
* Recover lease;
@ -1700,10 +1726,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
for (DatanodeDescriptor dn : targets) {
dn.incBlocksScheduled();
}
}
dir.persistBlocks(src, pendingFile);
} finally {
writeUnlock();
}
if (persistBlocks) {
getEditLog().logSync();
}
// Create next block
LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, fileLength);
@ -1782,10 +1812,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
+ b + " is removed from pendingCreates");
}
return true;
dir.persistBlocks(src, file);
} finally {
writeUnlock();
}
if (persistBlocks) {
getEditLog().logSync();
}
return true;
}
// make sure that we still have the lease on this file.
@ -2594,8 +2629,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
//remove lease, close file
finalizeINodeFileUnderConstruction(src, pendingFile);
} else if (supportAppends) {
// If this commit does not want to close the file, persist
// blocks only if append is supported
// If this commit does not want to close the file, persist blocks
// only if append is supported or we're explicitly told to
dir.persistBlocks(src, pendingFile);
}
} finally {
@ -3565,7 +3600,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
assert node != null : "Found a lease for nonexisting file.";
assert node.isUnderConstruction() :
"Found a lease for file that is not under construction.";
"Found a lease for file " + path + " that is not under construction." +
" lease=" + lease;
INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
BlockInfo[] blocks = cons.getBlocks();
if(blocks == null)
@ -3881,7 +3917,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
void setGenerationStamp(long stamp) {
generationStamp.setStamp(stamp);
notifyGenStampUpdate(stamp);
}
/**
@ -4000,7 +4035,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} finally {
writeUnlock();
}
if (supportAppends) {
if (supportAppends || persistBlocks) {
getEditLog().logSync();
}
LOG.info("updatePipeline(" + oldBlock + ") successfully to " + newBlock);

View File

@ -153,6 +153,9 @@ public class LeaseManager {
Lease lease = getLease(holder);
if (lease != null) {
removeLease(lease, src);
} else {
LOG.warn("Removing non-existent lease! holder=" + holder +
" src=" + src);
}
}

View File

@ -188,7 +188,7 @@ public class PendingDataNodeMessages {
*/
synchronized DataNodeMessage take(long gs) {
DataNodeMessage m = queue.peek();
if (m != null && m.getTargetGs() < gs) {
if (m != null && m.getTargetGs() <= gs) {
return queue.remove();
} else {
return null;

View File

@ -152,4 +152,5 @@ public class EditLogTailer {
}
}
}
}

View File

@ -0,0 +1,280 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level;
import java.io.IOException;
import java.util.Random;
import static org.junit.Assert.*;
import org.junit.Test;
/**
* A JUnit test for checking if restarting DFS preserves the
* blocks that are part of an unclosed file.
*/
public class TestPersistBlocks {
static {
((Log4JLogger)FSImage.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
}
private static final int BLOCK_SIZE = 4096;
private static final int NUM_BLOCKS = 5;
private static final String FILE_NAME = "/data";
private static final Path FILE_PATH = new Path(FILE_NAME);
static final byte[] DATA_BEFORE_RESTART = new byte[BLOCK_SIZE * NUM_BLOCKS];
static final byte[] DATA_AFTER_RESTART = new byte[BLOCK_SIZE * NUM_BLOCKS];
static {
Random rand = new Random();
rand.nextBytes(DATA_BEFORE_RESTART);
rand.nextBytes(DATA_AFTER_RESTART);
}
/** check if DFS remains in proper condition after a restart */
@Test
public void testRestartDfs() throws Exception {
final Configuration conf = new HdfsConfiguration();
// Turn off persistent IPC, so that the DFSClient can survive NN restart
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
0);
conf.setBoolean(DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY, true);
MiniDFSCluster cluster = null;
long len = 0;
FSDataOutputStream stream;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
FileSystem fs = cluster.getFileSystem();
// Creating a file with 4096 blockSize to write multiple blocks
stream = fs.create(FILE_PATH, true, BLOCK_SIZE, (short) 1, BLOCK_SIZE);
stream.write(DATA_BEFORE_RESTART);
stream.hflush();
// Wait for at least a few blocks to get through
while (len <= BLOCK_SIZE) {
FileStatus status = fs.getFileStatus(FILE_PATH);
len = status.getLen();
Thread.sleep(100);
}
// explicitly do NOT close the file.
cluster.restartNameNode();
// Check that the file has no less bytes than before the restart
// This would mean that blocks were successfully persisted to the log
FileStatus status = fs.getFileStatus(FILE_PATH);
assertTrue("Length too short: " + status.getLen(),
status.getLen() >= len);
// And keep writing (ensures that leases are also persisted correctly)
stream.write(DATA_AFTER_RESTART);
stream.close();
// Verify that the data showed up, both from before and after the restart.
FSDataInputStream readStream = fs.open(FILE_PATH);
try {
byte[] verifyBuf = new byte[DATA_BEFORE_RESTART.length];
IOUtils.readFully(readStream, verifyBuf, 0, verifyBuf.length);
assertArrayEquals(DATA_BEFORE_RESTART, verifyBuf);
IOUtils.readFully(readStream, verifyBuf, 0, verifyBuf.length);
assertArrayEquals(DATA_AFTER_RESTART, verifyBuf);
} finally {
IOUtils.closeStream(readStream);
}
} finally {
if (cluster != null) { cluster.shutdown(); }
}
}
@Test
public void testRestartDfsWithAbandonedBlock() throws Exception {
final Configuration conf = new HdfsConfiguration();
// Turn off persistent IPC, so that the DFSClient can survive NN restart
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
0);
conf.setBoolean(DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY, true);
MiniDFSCluster cluster = null;
long len = 0;
FSDataOutputStream stream;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
FileSystem fs = cluster.getFileSystem();
// Creating a file with 4096 blockSize to write multiple blocks
stream = fs.create(FILE_PATH, true, BLOCK_SIZE, (short) 1, BLOCK_SIZE);
stream.write(DATA_BEFORE_RESTART);
stream.hflush();
// Wait for all of the blocks to get through
while (len < BLOCK_SIZE * (NUM_BLOCKS - 1)) {
FileStatus status = fs.getFileStatus(FILE_PATH);
len = status.getLen();
Thread.sleep(100);
}
// Abandon the last block
DFSClient dfsclient = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
LocatedBlocks blocks = dfsclient.getNamenode().getBlockLocations(
FILE_NAME, 0, BLOCK_SIZE * NUM_BLOCKS);
assertEquals(NUM_BLOCKS, blocks.getLocatedBlocks().size());
LocatedBlock b = blocks.getLastLocatedBlock();
dfsclient.getNamenode().abandonBlock(b.getBlock(), FILE_NAME,
dfsclient.clientName);
// explicitly do NOT close the file.
cluster.restartNameNode();
// Check that the file has no less bytes than before the restart
// This would mean that blocks were successfully persisted to the log
FileStatus status = fs.getFileStatus(FILE_PATH);
assertTrue("Length incorrect: " + status.getLen(),
status.getLen() != len - BLOCK_SIZE);
// Verify the data showed up from before restart, sans abandoned block.
FSDataInputStream readStream = fs.open(FILE_PATH);
try {
byte[] verifyBuf = new byte[DATA_BEFORE_RESTART.length - BLOCK_SIZE];
IOUtils.readFully(readStream, verifyBuf, 0, verifyBuf.length);
byte[] expectedBuf = new byte[DATA_BEFORE_RESTART.length - BLOCK_SIZE];
System.arraycopy(DATA_BEFORE_RESTART, 0,
expectedBuf, 0, expectedBuf.length);
assertArrayEquals(expectedBuf, verifyBuf);
} finally {
IOUtils.closeStream(readStream);
}
} finally {
if (cluster != null) { cluster.shutdown(); }
}
}
@Test
public void testRestartWithPartialBlockHflushed() throws IOException {
final Configuration conf = new HdfsConfiguration();
// Turn off persistent IPC, so that the DFSClient can survive NN restart
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
0);
conf.setBoolean(DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY, true);
MiniDFSCluster cluster = null;
FSDataOutputStream stream;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
FileSystem fs = cluster.getFileSystem();
NameNode.getAddress(conf).getPort();
// Creating a file with 4096 blockSize to write multiple blocks
stream = fs.create(FILE_PATH, true, BLOCK_SIZE, (short) 1, BLOCK_SIZE);
stream.write(DATA_BEFORE_RESTART);
stream.write((byte)1);
stream.hflush();
// explicitly do NOT close the file before restarting the NN.
cluster.restartNameNode();
// this will fail if the final block of the file is prematurely COMPLETEd
stream.write((byte)2);
stream.hflush();
stream.close();
assertEquals(DATA_BEFORE_RESTART.length + 2,
fs.getFileStatus(FILE_PATH).getLen());
FSDataInputStream readStream = fs.open(FILE_PATH);
try {
byte[] verifyBuf = new byte[DATA_BEFORE_RESTART.length + 2];
IOUtils.readFully(readStream, verifyBuf, 0, verifyBuf.length);
byte[] expectedBuf = new byte[DATA_BEFORE_RESTART.length + 2];
System.arraycopy(DATA_BEFORE_RESTART, 0, expectedBuf, 0,
DATA_BEFORE_RESTART.length);
System.arraycopy(new byte[]{1, 2}, 0, expectedBuf,
DATA_BEFORE_RESTART.length, 2);
assertArrayEquals(expectedBuf, verifyBuf);
} finally {
IOUtils.closeStream(readStream);
}
} finally {
if (cluster != null) { cluster.shutdown(); }
}
}
@Test
public void testRestartWithAppend() throws IOException {
final Configuration conf = new HdfsConfiguration();
// Turn off persistent IPC, so that the DFSClient can survive NN restart
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
0);
conf.setBoolean(DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY, true);
MiniDFSCluster cluster = null;
FSDataOutputStream stream;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
FileSystem fs = cluster.getFileSystem();
NameNode.getAddress(conf).getPort();
// Creating a file with 4096 blockSize to write multiple blocks
stream = fs.create(FILE_PATH, true, BLOCK_SIZE, (short) 1, BLOCK_SIZE);
stream.write(DATA_BEFORE_RESTART, 0, DATA_BEFORE_RESTART.length / 2);
stream.close();
stream = fs.append(FILE_PATH, BLOCK_SIZE);
stream.write(DATA_BEFORE_RESTART, DATA_BEFORE_RESTART.length / 2,
DATA_BEFORE_RESTART.length / 2);
stream.close();
assertEquals(DATA_BEFORE_RESTART.length,
fs.getFileStatus(FILE_PATH).getLen());
cluster.restartNameNode();
assertEquals(DATA_BEFORE_RESTART.length,
fs.getFileStatus(FILE_PATH).getLen());
FSDataInputStream readStream = fs.open(FILE_PATH);
try {
byte[] verifyBuf = new byte[DATA_BEFORE_RESTART.length];
IOUtils.readFully(readStream, verifyBuf, 0, verifyBuf.length);
assertArrayEquals(DATA_BEFORE_RESTART, verifyBuf);
} finally {
IOUtils.closeStream(readStream);
}
} finally {
if (cluster != null) { cluster.shutdown(); }
}
}
}

View File

@ -116,10 +116,12 @@ public class TestEditLog extends TestCase {
int numTransactions;
short replication = 3;
long blockSize = 64;
final int id;
Transactions(FSNamesystem ns, int num) {
Transactions(FSNamesystem ns, int num, int id) {
namesystem = ns;
numTransactions = num;
this.id = id;
}
// add a bunch of transactions.
@ -131,8 +133,9 @@ public class TestEditLog extends TestCase {
for (int i = 0; i < numTransactions; i++) {
INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
p, replication, blockSize, 0, "", "", null);
editLog.logOpenFile("/filename" + i, inode);
editLog.logCloseFile("/filename" + i, inode);
String fileName = "/filename-" + id + "-" + i;
editLog.logOpenFile(fileName, inode);
editLog.logCloseFile(fileName, inode);
editLog.logSync();
}
}
@ -280,7 +283,7 @@ public class TestEditLog extends TestCase {
// Create threads and make them run transactions concurrently.
Thread threadId[] = new Thread[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
Transactions trans = new Transactions(namesystem, NUM_TRANSACTIONS);
Transactions trans = new Transactions(namesystem, NUM_TRANSACTIONS, i);
threadId[i] = new Thread(trans, "TransactionThread-" + i);
threadId[i].start();
}