From 627da6f7178e18aa41996969c408b6f344e297d1 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Wed, 15 Feb 2017 10:44:37 -0800 Subject: [PATCH] HDFS-8498. Blocks can be committed with wrong size. Contributed by Jing Zhao. --- .../org/apache/hadoop/hdfs/DataStreamer.java | 100 ++++++++++++------ .../hadoop/hdfs/StripedDataStreamer.java | 8 +- .../hadoop/hdfs/TestDFSOutputStream.java | 3 +- 3 files changed, 72 insertions(+), 39 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 8e6eb63abe..026853736d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -142,8 +142,6 @@ boolean continueRetryingOrThrow() throws InvalidEncryptionKeyException { /** * Record a connection exception. - * @param e - * @throws InvalidEncryptionKeyException */ void recordFailure(final InvalidEncryptionKeyException e) throws InvalidEncryptionKeyException { @@ -178,9 +176,8 @@ void sendTransferBlock(final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, final Token blockToken) throws IOException { //send the TRANSFER_BLOCK request - new Sender(out) - .transferBlock(block, blockToken, dfsClient.clientName, targets, - targetStorageTypes); + new Sender(out).transferBlock(block.getCurrentBlock(), blockToken, + dfsClient.clientName, targets, targetStorageTypes); out.flush(); //ack BlockOpResponseProto transferResponse = BlockOpResponseProto @@ -199,6 +196,42 @@ public void close() throws IOException { } } + static class BlockToWrite { + private ExtendedBlock currentBlock; + + BlockToWrite(ExtendedBlock block) { + setCurrentBlock(block); + } + + synchronized ExtendedBlock getCurrentBlock() { + return currentBlock == null ? null : new ExtendedBlock(currentBlock); + } + + synchronized long getNumBytes() { + return currentBlock == null ? 0 : currentBlock.getNumBytes(); + } + + synchronized void setCurrentBlock(ExtendedBlock block) { + currentBlock = (block == null || block.getLocalBlock() == null) ? + null : new ExtendedBlock(block); + } + + synchronized void setNumBytes(long numBytes) { + assert currentBlock != null; + currentBlock.setNumBytes(numBytes); + } + + synchronized void setGenerationStamp(long generationStamp) { + assert currentBlock != null; + currentBlock.setGenerationStamp(generationStamp); + } + + @Override + public synchronized String toString() { + return currentBlock == null ? "null" : currentBlock.toString(); + } + } + /** * Create a socket for a write pipeline * @@ -440,7 +473,7 @@ synchronized void checkRestartingNodeDeadline(DatanodeInfo[] nodes) { } private volatile boolean streamerClosed = false; - protected volatile ExtendedBlock block; // its length is number of bytes acked + protected final BlockToWrite block; // its length is number of bytes acked protected Token accessToken; private DataOutputStream blockStream; private DataInputStream blockReplyStream; @@ -508,7 +541,7 @@ private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, ByteArrayManager byteArrayManage, boolean isAppend, String[] favoredNodes, EnumSet flags) { - this.block = block; + this.block = new BlockToWrite(block); this.dfsClient = dfsClient; this.src = src; this.progress = progress; @@ -1322,7 +1355,7 @@ private void addDatanode2ExistingPipeline() throws IOException { LocatedBlock lb; //get a new datanode lb = dfsClient.namenode.getAdditionalDatanode( - src, stat.getFileId(), block, nodes, storageIDs, + src, stat.getFileId(), block.getCurrentBlock(), nodes, storageIDs, exclude.toArray(new DatanodeInfo[exclude.size()]), 1, dfsClient.clientName); // a new node was allocated by the namenode. Update nodes. @@ -1440,7 +1473,7 @@ protected void setupPipelineInternal(DatanodeInfo[] datanodes, } // while if (success) { - block = updatePipeline(newGS); + updatePipeline(newGS); } } @@ -1536,21 +1569,22 @@ void failPacket4Testing() { } private LocatedBlock updateBlockForPipeline() throws IOException { - return dfsClient.namenode.updateBlockForPipeline(block, + return dfsClient.namenode.updateBlockForPipeline(block.getCurrentBlock(), dfsClient.clientName); } - static ExtendedBlock newBlock(ExtendedBlock b, final long newGS) { - return new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(), - b.getNumBytes(), newGS); + void updateBlockGS(final long newGS) { + block.setGenerationStamp(newGS); } /** update pipeline at the namenode */ - ExtendedBlock updatePipeline(long newGS) throws IOException { - final ExtendedBlock newBlock = newBlock(block, newGS); - dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, - nodes, storageIDs); - return newBlock; + private void updatePipeline(long newGS) throws IOException { + final ExtendedBlock oldBlock = block.getCurrentBlock(); + // the new GS has been propagated to all DN, it should be ok to update the + // local block state + updateBlockGS(newGS); + dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock, + block.getCurrentBlock(), nodes, storageIDs); } DatanodeInfo[] getExcludedNodes() { @@ -1570,31 +1604,29 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { StorageType[] storageTypes; int count = dfsClient.getConf().getNumBlockWriteRetry(); boolean success; - ExtendedBlock oldBlock = block; + final ExtendedBlock oldBlock = block.getCurrentBlock(); do { errorState.resetInternalError(); lastException.clear(); DatanodeInfo[] excluded = getExcludedNodes(); - block = oldBlock; - lb = locateFollowingBlock(excluded.length > 0 ? excluded : null); - block = lb.getBlock(); + lb = locateFollowingBlock( + excluded.length > 0 ? excluded : null, oldBlock); + block.setCurrentBlock(lb.getBlock()); block.setNumBytes(0); bytesSent = 0; accessToken = lb.getBlockToken(); nodes = lb.getLocations(); storageTypes = lb.getStorageTypes(); - // // Connect to first DataNode in the list. - // success = createBlockOutputStream(nodes, storageTypes, 0L, false); if (!success) { LOG.warn("Abandoning " + block); - dfsClient.namenode.abandonBlock(block, stat.getFileId(), src, - dfsClient.clientName); - block = null; + dfsClient.namenode.abandonBlock(block.getCurrentBlock(), + stat.getFileId(), src, dfsClient.clientName); + block.setCurrentBlock(null); final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()]; LOG.warn("Excluding datanode " + badNode); excludedNodes.put(badNode, badNode); @@ -1655,7 +1687,7 @@ boolean createBlockOutputStream(DatanodeInfo[] nodes, // We cannot change the block length in 'block' as it counts the number // of bytes ack'ed. - ExtendedBlock blockCopy = new ExtendedBlock(block); + ExtendedBlock blockCopy = block.getCurrentBlock(); blockCopy.setNumBytes(stat.getBlockSize()); boolean[] targetPinnings = getPinnings(nodes); @@ -1765,9 +1797,9 @@ private boolean[] getPinnings(DatanodeInfo[] nodes) { } } - private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) - throws IOException { - return DFSOutputStream.addBlock(excludedNodes, dfsClient, src, block, + private LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded, + ExtendedBlock oldBlock) throws IOException { + return DFSOutputStream.addBlock(excluded, dfsClient, src, oldBlock, stat.getFileId(), favoredNodes, addBlockFlags); } @@ -1811,7 +1843,7 @@ private void backOffIfNecessary() throws InterruptedException { * @return the block this streamer is writing to */ ExtendedBlock getBlock() { - return block; + return block.getCurrentBlock(); } /** @@ -2016,6 +2048,8 @@ void closeSocket() throws IOException { @Override public String toString() { - return block == null? "block==null": "" + block.getLocalBlock(); + final ExtendedBlock extendedBlock = block.getCurrentBlock(); + return extendedBlock == null ? + "block==null" : "" + extendedBlock.getLocalBlock(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index 89ab6a3e03..b457edbf8d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -71,7 +71,7 @@ boolean isHealthy() { @Override protected void endBlock() { - coordinator.offerEndBlock(index, block); + coordinator.offerEndBlock(index, block.getCurrentBlock()); super.endBlock(); } @@ -93,7 +93,7 @@ private LocatedBlock getFollowingBlock() throws IOException { protected LocatedBlock nextBlockOutputStream() throws IOException { boolean success; LocatedBlock lb = getFollowingBlock(); - block = lb.getBlock(); + block.setCurrentBlock(lb.getBlock()); block.setNumBytes(0); bytesSent = 0; accessToken = lb.getBlockToken(); @@ -105,7 +105,7 @@ protected LocatedBlock nextBlockOutputStream() throws IOException { success = createBlockOutputStream(nodes, storageTypes, 0L, false); if (!success) { - block = null; + block.setCurrentBlock(null); final DatanodeInfo badNode = nodes[getErrorState().getBadNodeIndex()]; LOG.warn("Excluding datanode " + badNode); excludedNodes.put(badNode, badNode); @@ -161,7 +161,7 @@ protected void setupPipelineInternal(DatanodeInfo[] nodes, success = coordinator.takeStreamerUpdateResult(index); if (success) { // if all succeeded, update its block using the new GS - block = newBlock(block, newGS); + updateBlockGS(newGS); } else { // otherwise close the block stream and restart the recovery process closeStream(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index 750103d19d..9ec01b6dbc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -110,8 +110,7 @@ public void testCloseTwice() throws IOException { * packet size < 64kB. See HDFS-7308 for details. */ @Test - public void testComputePacketChunkSize() - throws Exception { + public void testComputePacketChunkSize() throws Exception { DistributedFileSystem fs = cluster.getFileSystem(); FSDataOutputStream os = fs.create(new Path("/test")); DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,