diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 02ef47ec3e..9c00ea7037 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -119,6 +119,7 @@ import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.EncryptionZone; import org.apache.hadoop.hdfs.protocol.EncryptionZoneIterator; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; @@ -160,10 +161,10 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.io.Text; -import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.io.retry.LossyRetryInvocationHandler; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RpcNoSuchMethodException; import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.NetUtils; @@ -178,16 +179,15 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.net.InetAddresses; -import org.apache.htrace.core.Tracer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /******************************************************** * DFSClient can connect to a Hadoop Filesystem and @@ -1291,17 +1291,43 @@ public String getLinkTarget(String path) throws IOException { } } + /** + * Invoke namenode append RPC. + * It retries in case of {@link BlockNotYetCompleteException}. + */ + private LastBlockWithStatus callAppend(String src, + EnumSetWritable flag) throws IOException { + final long startTime = Time.monotonicNow(); + for(;;) { + try { + return namenode.append(src, clientName, flag); + } catch(RemoteException re) { + if (Time.monotonicNow() - startTime > 5000 + || !RetriableException.class.getName().equals( + re.getClassName())) { + throw re; + } + + try { // sleep and retry + Thread.sleep(500); + } catch (InterruptedException e) { + throw DFSUtilClient.toInterruptedIOException("callAppend", e); + } + } + } + } + /** Method to get stream returned by append call */ private DFSOutputStream callAppend(String src, EnumSet flag, Progressable progress, String[] favoredNodes) throws IOException { CreateFlag.validateForAppend(flag); try { - LastBlockWithStatus blkWithStatus = namenode.append(src, clientName, + final LastBlockWithStatus blkWithStatus = callAppend(src, new EnumSetWritable<>(flag, CreateFlag.class)); HdfsFileStatus status = blkWithStatus.getFileStatus(); if (status == null) { - DFSClient.LOG.debug("NameNode is on an older version, request file " + - "info with additional RPC call for file: " + src); + LOG.debug("NameNode is on an older version, request file " + + "info with additional RPC call for file: {}", src); status = getFileInfo(src); } return DFSOutputStream.newStreamForAppend(this, src, flag, progress, diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index c249338b82..bac4d121c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -475,8 +475,7 @@ void setPipelineInConstruction(LocatedBlock lastBlock) throws IOException{ setPipeline(lastBlock); if (nodes.length < 1) { throw new IOException("Unable to retrieve blocks locations " + - " for last block " + block + - "of file " + src); + " for last block " + block + " of file " + src); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 68d5de63d9..a14a1d8db0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1910,6 +1910,9 @@ Release 2.8.0 - UNRELEASED HDFS-9436. Make NNThroughputBenchmark$BlockReportStats run with 10 datanodes by default. (Mingliang Liu via shv) + HDFS-8999. Allow a file to be closed with COMMITTED but not yet COMPLETE + blocks. (szetszwo) + BUG FIXES HDFS-7501. TransactionsSinceLastCheckpoint can be negative on SBNs. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 9dd251fd8b..5217740799 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -203,6 +203,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_REPLICATION_MIN_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MIN_KEY; public static final int DFS_NAMENODE_REPLICATION_MIN_DEFAULT = 1; + public static final String DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY + = "dfs.namenode.file.close.num-committed-allowed"; + public static final int DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_DEFAULT + = 0; public static final String DFS_NAMENODE_STRIPE_MIN_KEY = "dfs.namenode.stripe.min"; public static final int DFS_NAMENODE_STRIPE_MIN_DEFAULT = 1; public static final String DFS_NAMENODE_SAFEMODE_REPLICATION_MIN_KEY = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index d255471419..a76429edaa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -641,6 +641,10 @@ public int getDefaultStorageNum(BlockInfo block) { } } + public short getMinReplication() { + return minReplication; + } + public short getMinStorageNum(BlockInfo block) { if (block.isStriped()) { return ((BlockInfoStriped) block).getRealDataBlockNum(); @@ -703,8 +707,8 @@ public boolean commitOrCompleteLastBlock(BlockCollection bc, final boolean b = commitBlock(lastBlock, commitBlock); if (hasMinStorage(lastBlock)) { - if (b && !bc.isStriped()) { - addExpectedReplicasToPending(lastBlock); + if (b) { + addExpectedReplicasToPending(lastBlock, bc); } completeBlock(lastBlock, false); } @@ -716,6 +720,12 @@ public boolean commitOrCompleteLastBlock(BlockCollection bc, * pendingReplications in order to keep ReplicationMonitor from scheduling * the block. */ + public void addExpectedReplicasToPending(BlockInfo blk, BlockCollection bc) { + if (!bc.isStriped()) { + addExpectedReplicasToPending(blk); + } + } + private void addExpectedReplicasToPending(BlockInfo lastBlock) { DatanodeStorageInfo[] expectedStorages = lastBlock.getUnderConstructionFeature().getExpectedStorageLocations(); @@ -2844,9 +2854,7 @@ private Block addStoredBlock(final BlockInfo block, if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED && hasMinStorage(storedBlock, numLiveReplicas)) { - if (!bc.isStriped()) { - addExpectedReplicasToPending(storedBlock); - } + addExpectedReplicasToPending(storedBlock, bc); completeBlock(storedBlock, false); } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block @@ -3825,26 +3833,6 @@ public void checkReplication(BlockCollection bc) { } } - /** - * Check that the indicated blocks are present and - * replicated. - */ - public boolean checkBlocksProperlyReplicated( - String src, BlockInfo[] blocks) { - for (BlockInfo b: blocks) { - if (!b.isComplete()) { - final int numNodes = b.numNodes(); - final int min = getMinStorageNum(b); - final BlockUCState state = b.getBlockUCState(); - LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = " + state - + ", replication# = " + numNodes + (numNodes < min ? " < " : " >= ") - + " minimum = " + min + ") in file " + src); - return false; - } - } - return true; - } - /** * @return 0 if the block is not found; * otherwise, return the replication factor of the block. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java index ae84f39506..88d706b989 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAppendOp.java @@ -33,8 +33,10 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.RecoverLeaseOp; import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion.Feature; +import org.apache.hadoop.ipc.RetriableException; import com.google.common.base.Preconditions; @@ -126,10 +128,17 @@ static LastBlockWithStatus appendFile(final FSNamesystem fsn, final BlockInfo lastBlock = file.getLastBlock(); // Check that the block has at least minimum replication. - if (lastBlock != null && lastBlock.isComplete() + if (lastBlock != null) { + if (lastBlock.getBlockUCState() == BlockUCState.COMMITTED) { + throw new RetriableException( + new NotReplicatedYetException("append: lastBlock=" + + lastBlock + " of src=" + path + + " is COMMITTED but not yet COMPLETE.")); + } else if (lastBlock.isComplete() && !blockManager.isSufficientlyReplicated(lastBlock)) { - throw new IOException("append: lastBlock=" + lastBlock + " of src=" - + path + " is not sufficiently replicated yet."); + throw new IOException("append: lastBlock=" + lastBlock + " of src=" + + path + " is not sufficiently replicated yet."); + } } lb = prepareFileForAppend(fsn, iip, holder, clientMachine, newBlock, true, logRetryCache); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index 0a71d787cf..914fbd9de0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -789,8 +789,10 @@ private static boolean completeFileInternal( return false; } + fsn.addCommittedBlocksToPending(pendingFile); + fsn.finalizeINodeFileUnderConstruction(src, pendingFile, - Snapshot.CURRENT_STATE_ID); + Snapshot.CURRENT_STATE_ID, true); return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 5d27786ad3..1a9d7a9d87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp; import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade; import static org.apache.hadoop.util.Time.monotonicNow; @@ -29,7 +28,6 @@ import java.util.EnumSet; import java.util.List; -import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -94,6 +92,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp; import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo; @@ -457,8 +456,9 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir, // One might expect that you could use removeLease(holder, path) here, // but OP_CLOSE doesn't serialize the holder. So, remove the inode. if (file.isUnderConstruction()) { - fsNamesys.leaseManager.removeLeases(Lists.newArrayList(file.getId())); - file.toCompleteFile(file.getModificationTime()); + fsNamesys.getLeaseManager().removeLease(file.getId()); + file.toCompleteFile(file.getModificationTime(), 0, + fsNamesys.getBlockManager().getMinReplication()); } break; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index afa41c100d..169395838a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -459,6 +459,7 @@ private void logAuditEvent(boolean succeeded, private final long minBlockSize; // minimum block size final long maxBlocksPerFile; // maximum # of blocks per file + private final int numCommittedAllowed; /** Lock to protect FSNamesystem. */ private final FSNamesystemLock fsLock; @@ -756,6 +757,9 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException { DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT); this.maxBlocksPerFile = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY, DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT); + this.numCommittedAllowed = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, + DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_DEFAULT); this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); @@ -2594,17 +2598,36 @@ Block createNewBlock(boolean isStriped) throws IOException { boolean checkFileProgress(String src, INodeFile v, boolean checkall) { assert hasReadLock(); if (checkall) { - return blockManager.checkBlocksProperlyReplicated(src, v - .getBlocks()); + return checkBlocksComplete(src, true, v.getBlocks()); } else { - // check the penultimate block of this file - BlockInfo b = v.getPenultimateBlock(); - return b == null || - blockManager.checkBlocksProperlyReplicated( - src, new BlockInfo[] { b }); + final BlockInfo[] blocks = v.getBlocks(); + final int i = blocks.length - numCommittedAllowed - 2; + return i < 0 || blocks[i] == null + || checkBlocksComplete(src, false, blocks[i]); } } + /** + * Check if the blocks are COMPLETE; + * it may allow the last block to be COMMITTED. + */ + private boolean checkBlocksComplete(String src, boolean allowCommittedBlock, + BlockInfo... blocks) { + final int n = allowCommittedBlock? numCommittedAllowed: 0; + for(int i = 0; i < blocks.length; i++) { + final short min = blockManager.getMinStorageNum(blocks[i]); + final String err = INodeFile.checkBlockComplete(blocks, i, n, min); + if (err != null) { + final int numNodes = blocks[i].numNodes(); + LOG.info("BLOCK* " + err + "(numNodes= " + numNodes + + (numNodes < min ? " < " : " >= ") + + " minimum = " + min + ") in file " + src); + return false; + } + } + return true; + } + /** * Change the indicated filename. * @deprecated Use {@link #renameTo(String, String, boolean, @@ -2735,7 +2758,9 @@ void removeLeasesAndINodes(List removedUCFiles, List removedINodes, final boolean acquireINodeMapLock) { assert hasWriteLock(); - leaseManager.removeLeases(removedUCFiles); + for(long i : removedUCFiles) { + leaseManager.removeLease(i); + } // remove inodes from inodesMap if (removedINodes != null) { if (acquireINodeMapLock) { @@ -2994,7 +3019,7 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip, // then reap lease immediately and close the file. if(nrCompleteBlocks == nrBlocks) { finalizeINodeFileUnderConstruction(src, pendingFile, - iip.getLatestSnapshotId()); + iip.getLatestSnapshotId(), false); NameNode.stateChangeLog.warn("BLOCK*" + " internalReleaseLease: All existing blocks are COMPLETE," + " lease removed, file closed."); @@ -3033,7 +3058,7 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip, if(penultimateBlockMinStorage && blockManager.hasMinStorage(lastBlock)) { finalizeINodeFileUnderConstruction(src, pendingFile, - iip.getLatestSnapshotId()); + iip.getLatestSnapshotId(), false); NameNode.stateChangeLog.warn("BLOCK*" + " internalReleaseLease: Committed blocks are minimally replicated," + " lease removed, file closed."); @@ -3077,7 +3102,7 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip, // We can remove this block and close the file. pendingFile.removeLastBlock(lastBlock); finalizeINodeFileUnderConstruction(src, pendingFile, - iip.getLatestSnapshotId()); + iip.getLatestSnapshotId(), false); NameNode.stateChangeLog.warn("BLOCK* internalReleaseLease: " + "Removed empty last block and closed file."); return true; @@ -3163,8 +3188,23 @@ void commitOrCompleteLastBlock( } } - void finalizeINodeFileUnderConstruction( - String src, INodeFile pendingFile, int latestSnapshot) throws IOException { + void addCommittedBlocksToPending(final INodeFile pendingFile) { + final BlockInfo[] blocks = pendingFile.getBlocks(); + int i = blocks.length - numCommittedAllowed; + if (i < 0) { + i = 0; + } + for(; i < blocks.length; i++) { + final BlockInfo b = blocks[i]; + if (b != null && b.getBlockUCState() == BlockUCState.COMMITTED) { + // b is COMMITTED but not yet COMPLETE, add it to pending replication. + blockManager.addExpectedReplicasToPending(b, pendingFile); + } + } + } + + void finalizeINodeFileUnderConstruction(String src, INodeFile pendingFile, + int latestSnapshot, boolean allowCommittedBlock) throws IOException { assert hasWriteLock(); FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature(); @@ -3179,7 +3219,9 @@ void finalizeINodeFileUnderConstruction( // The file is no longer pending. // Create permanent INode, update blocks. No need to replace the inode here // since we just remove the uc feature from pendingFile - pendingFile.toCompleteFile(now()); + pendingFile.toCompleteFile(now(), + allowCommittedBlock? numCommittedAllowed: 0, + blockManager.getMinReplication()); // close file and persist block allocations for this file closeFile(src, pendingFile); @@ -3412,8 +3454,8 @@ void closeFileCommitBlocks(String src, INodeFile pendingFile, commitOrCompleteLastBlock(pendingFile, iip, storedBlock); //remove lease, close file - finalizeINodeFileUnderConstruction(src, pendingFile, - Snapshot.findLatestSnapshot(pendingFile, Snapshot.CURRENT_STATE_ID)); + int s = Snapshot.findLatestSnapshot(pendingFile, Snapshot.CURRENT_STATE_ID); + finalizeINodeFileUnderConstruction(src, pendingFile, s, false); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index 962a2825f4..353f29b219 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -224,30 +224,58 @@ INodeFile toUnderConstruction(String clientName, String clientMachine) { * Convert the file to a complete file, i.e., to remove the Under-Construction * feature. */ - public INodeFile toCompleteFile(long mtime) { - Preconditions.checkState(isUnderConstruction(), - "file is no longer under construction"); - FileUnderConstructionFeature uc = getFileUnderConstructionFeature(); - if (uc != null) { - assertAllBlocksComplete(); - removeFeature(uc); - this.setModificationTime(mtime); - } - return this; + void toCompleteFile(long mtime, int numCommittedAllowed, short minReplication) { + final FileUnderConstructionFeature uc = getFileUnderConstructionFeature(); + Preconditions.checkNotNull(uc, "File %s is not under construction", this); + assertAllBlocksComplete(numCommittedAllowed, minReplication); + removeFeature(uc); + setModificationTime(mtime); } /** Assert all blocks are complete. */ - private void assertAllBlocksComplete() { + private void assertAllBlocksComplete(int numCommittedAllowed, + short minReplication) { if (blocks == null) { return; } for (int i = 0; i < blocks.length; i++) { - Preconditions.checkState(blocks[i].isComplete(), "Failed to finalize" - + " %s %s since blocks[%s] is non-complete, where blocks=%s.", - getClass().getSimpleName(), this, i, Arrays.asList(blocks)); + final String err = checkBlockComplete(blocks, i, numCommittedAllowed, + minReplication); + Preconditions.checkState(err == null, + "Unexpected block state: %s, file=%s (%s), blocks=%s (i=%s)", + err, this, getClass().getSimpleName(), Arrays.asList(blocks), i); } } + /** + * Check if the i-th block is COMPLETE; + * when the i-th block is the last block, it may be allowed to be COMMITTED. + * + * @return null if the block passes the check; + * otherwise, return an error message. + */ + static String checkBlockComplete(BlockInfo[] blocks, int i, + int numCommittedAllowed, short minReplication) { + final BlockInfo b = blocks[i]; + final BlockUCState state = b.getBlockUCState(); + if (state == BlockUCState.COMPLETE) { + return null; + } + if (b.isStriped() || i < blocks.length - numCommittedAllowed) { + return b + " is " + state + " but not COMPLETE"; + } + if (state != BlockUCState.COMMITTED) { + return b + " is " + state + " but neither COMPLETE nor COMMITTED"; + } + final int numExpectedLocations + = b.getUnderConstructionFeature().getNumExpectedLocations(); + if (numExpectedLocations <= minReplication) { + return b + " is " + state + " but numExpectedLocations = " + + numExpectedLocations + " <= minReplication = " + minReplication; + } + return null; + } + @Override // BlockCollection public void setBlock(int index, BlockInfo blk) { Preconditions.checkArgument(blk.isStriped() == this.isStriped()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java index cec9313819..e97aa53744 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java @@ -160,6 +160,13 @@ synchronized Lease addLease(String holder, long inodeId) { return lease; } + synchronized void removeLease(long inodeId) { + final Lease lease = leasesById.get(inodeId); + if (lease != null) { + removeLease(lease, inodeId); + } + } + /** * Remove the specified lease and src. */ @@ -298,16 +305,6 @@ long getLastUpdate() { } } - @VisibleForTesting - synchronized void removeLeases(Collection inodes) { - for (long inode : inodes) { - Lease lease = leasesById.get(inode); - if (lease != null) { - removeLease(lease, inode); - } - } - } - public void setLeasePeriod(long softLimit, long hardLimit) { this.softLimit = softLimit; this.hardLimit = hardLimit; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java index 11d1b180fb..56b65909b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java @@ -27,10 +27,12 @@ import java.net.InetSocketAddress; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.HardLink; @@ -41,12 +43,12 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.Time; import org.junit.Assert; import org.junit.Test; @@ -55,6 +57,8 @@ * support HDFS appends. */ public class TestFileAppend{ + private static final long RANDOM_TEST_RUNTIME = 10000; + final boolean simulatedStorage = false; private static byte[] fileContents = null; @@ -381,6 +385,56 @@ public void testAppend2Twice() throws Exception { } } + + @Test + public void testMultipleAppends() throws Exception { + final long startTime = Time.monotonicNow(); + final Configuration conf = new HdfsConfiguration(); + conf.setInt( + DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, 1); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(4).build(); + final DistributedFileSystem fs = cluster.getFileSystem(); + try { + final Path p = new Path("/testMultipleAppend/foo"); + final int blockSize = 1 << 16; + final byte[] data = AppendTestUtil.initBuffer(blockSize); + + // create an empty file. + fs.create(p, true, 4096, (short)3, blockSize).close(); + + int fileLen = 0; + for(int i = 0; + i < 10 || Time.monotonicNow() - startTime < RANDOM_TEST_RUNTIME; + i++) { + int appendLen = ThreadLocalRandom.current().nextInt(100) + 1; + if (fileLen + appendLen > data.length) { + break; + } + + AppendTestUtil.LOG.info(i + ") fileLen=" + fileLen + + ", appendLen=" + appendLen); + final FSDataOutputStream out = fs.append(p); + out.write(data, fileLen, appendLen); + out.close(); + fileLen += appendLen; + } + + Assert.assertEquals(fileLen, fs.getFileStatus(p).getLen()); + final byte[] actual = new byte[fileLen]; + final FSDataInputStream in = fs.open(p); + in.readFully(actual); + in.close(); + for(int i = 0; i < fileLen; i++) { + Assert.assertEquals(data[i], actual[i]); + } + } finally { + fs.close(); + cluster.shutdown(); + } + } + /** Tests appending after soft-limit expires. */ @Test public void testAppendAfterSoftLimit() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java index 4152712af6..47c3ace06b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.spy; @@ -42,20 +41,19 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.StripedFileTestUtil; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.PathUtils; @@ -473,7 +471,7 @@ public void testAddNewStripedBlock() throws IOException{ file.toUnderConstruction(clientName, clientMachine); file.addBlock(stripedBlk); fns.getEditLog().logAddBlock(testFilePath, file); - file.toCompleteFile(System.currentTimeMillis()); + TestINodeFile.toCompleteFile(file); //If the block by loaded is the same as above it means that //we have successfully applied the edit log to the fsimage. @@ -539,7 +537,7 @@ public void testUpdateStripedBlocks() throws IOException{ file.toUnderConstruction(clientName, clientMachine); file.addBlock(stripedBlk); fns.getEditLog().logAddBlock(testFilePath, file); - file.toCompleteFile(System.currentTimeMillis()); + TestINodeFile.toCompleteFile(file); fns.enterSafeMode(false); fns.saveNamespace(0, 0); fns.leaveSafeMode(false); @@ -551,7 +549,7 @@ public void testUpdateStripedBlocks() throws IOException{ file.getLastBlock().setNumBytes(newBlkNumBytes); file.getLastBlock().setGenerationStamp(newTimestamp); fns.getEditLog().logUpdateBlocks(testFilePath, file, true); - file.toCompleteFile(System.currentTimeMillis()); + TestINodeFile.toCompleteFile(file); //After the namenode restarts if the block by loaded is the same as above //(new block size and timestamp) it means that we have successfully @@ -616,7 +614,7 @@ public void testHasNonEcBlockUsingStripedIDForAddBlock() throws IOException{ file.toUnderConstruction(clientName, clientMachine); file.addBlock(cBlk); fns.getEditLog().logAddBlock(testFilePath, file); - file.toCompleteFile(System.currentTimeMillis()); + TestINodeFile.toCompleteFile(file); cluster.restartNameNodes(); cluster.waitActive(); fns = cluster.getNamesystem(); @@ -662,7 +660,7 @@ public void testHasNonEcBlockUsingStripedIDForUpdateBlocks() INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath); file.toUnderConstruction(clientName, clientMachine); file.addBlock(cBlk); - file.toCompleteFile(System.currentTimeMillis()); + TestINodeFile.toCompleteFile(file); long newBlkNumBytes = 1024*8; long newTimestamp = 1426222918+3600; @@ -671,7 +669,7 @@ public void testHasNonEcBlockUsingStripedIDForUpdateBlocks() file.getLastBlock().setNumBytes(newBlkNumBytes); file.getLastBlock().setGenerationStamp(newTimestamp); fns.getEditLog().logUpdateBlocks(testFilePath, file, true); - file.toCompleteFile(System.currentTimeMillis()); + TestINodeFile.toCompleteFile(file); cluster.restartNameNodes(); cluster.waitActive(); fns = cluster.getNamesystem(); @@ -685,5 +683,4 @@ public void testHasNonEcBlockUsingStripedIDForUpdateBlocks() } } } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java index 0c7398c16d..8e39bca8f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java @@ -509,7 +509,7 @@ public void testHasNonEcBlockUsingStripedIDForLoadFile() throws IOException{ INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath); file.toUnderConstruction(clientName, clientMachine); file.addBlock(cBlk); - file.toCompleteFile(System.currentTimeMillis()); + TestINodeFile.toCompleteFile(file); fns.enterSafeMode(false); fns.saveNamespace(0, 0); cluster.restartNameNodes(); @@ -617,7 +617,7 @@ public void testHasNonEcBlockUsingStripedIDForLoadSnapshot() INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath); file.toUnderConstruction(clientName, clientMachine); file.addBlock(cBlk); - file.toCompleteFile(System.currentTimeMillis()); + TestINodeFile.toCompleteFile(file); fs.createSnapshot(d,"testHasNonEcBlockUsingStripeID"); fs.truncate(p,0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java index 98e8426cc1..68519ab59b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java @@ -94,6 +94,10 @@ static public INodeFile createINodeFile(long id) { (short)3, 1024L); } + static void toCompleteFile(INodeFile file) { + file.toCompleteFile(Time.now(), 0, (short)1); + } + INodeFile createINodeFile(short replication, long preferredBlockSize) { return new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID, null, perm, 0L, 0L, null, replication, preferredBlockSize); @@ -1130,7 +1134,7 @@ public void testFileUnderConstruction() { assertEquals(clientName, uc.getClientName()); assertEquals(clientMachine, uc.getClientMachine()); - file.toCompleteFile(Time.now()); + toCompleteFile(file); assertFalse(file.isUnderConstruction()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java index de301617bd..3bb7bb71bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestLeaseManager.java @@ -51,8 +51,8 @@ public void testRemoveLeases() throws Exception { } assertEquals(4, lm.getINodeIdWithLeases().size()); - synchronized (lm) { - lm.removeLeases(ids); + for (long id : ids) { + lm.removeLease(id); } assertEquals(0, lm.getINodeIdWithLeases().size()); }