diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java index 93a5948007..dc5a77fab2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java @@ -31,15 +31,21 @@ @InterfaceAudience.Private @InterfaceStability.Evolving public class LocatedStripedBlock extends LocatedBlock { + private static final int[] EMPTY_INDICES = {}; + private int[] blockIndices; public LocatedStripedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs, StorageType[] storageTypes, int[] indices, long startOffset, boolean corrupt, DatanodeInfo[] cachedLocs) { super(b, locs, storageIDs, storageTypes, startOffset, corrupt, cachedLocs); - assert indices != null && indices.length == locs.length; - this.blockIndices = new int[indices.length]; - System.arraycopy(indices, 0, blockIndices, 0, indices.length); + + if (indices == null) { + this.blockIndices = EMPTY_INDICES; + } else { + this.blockIndices = new int[indices.length]; + System.arraycopy(indices, 0, blockIndices, 0, indices.length); + } } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index 48bc9d6919..b608b10722 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -238,3 +238,6 @@ HDFS-8428. Erasure Coding: Fix the NullPointerException when deleting file. (Yi Liu via zhz). + + HDFS-8323. Bump GenerationStamp for write faliure in DFSStripedOutputStream. + (Tsz Wo Nicholas Sze via jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 4399a37f85..8eed6ada47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -33,7 +33,6 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CreateFlag; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -61,52 +60,72 @@ @InterfaceAudience.Private public class DFSStripedOutputStream extends DFSOutputStream { - /** Coordinate the communication between the streamers. */ - static class Coordinator { - private final DfsClientConf conf; - private final List> endBlocks; - private final List> stripedBlocks; - private volatile boolean shouldLocateFollowingBlock = false; + static class MultipleBlockingQueue { + private final int pullTimeout; + private final List> queues; - Coordinator(final DfsClientConf conf, final int numDataBlocks, - final int numAllBlocks) { - this.conf = conf; - endBlocks = new ArrayList<>(numDataBlocks); - for (int i = 0; i < numDataBlocks; i++) { - endBlocks.add(new LinkedBlockingQueue(1)); + MultipleBlockingQueue(int numQueue, int queueSize, int pullTimeout) { + queues = new ArrayList<>(numQueue); + for (int i = 0; i < numQueue; i++) { + queues.add(new LinkedBlockingQueue(queueSize)); } - stripedBlocks = new ArrayList<>(numAllBlocks); - for (int i = 0; i < numAllBlocks; i++) { - stripedBlocks.add(new LinkedBlockingQueue(1)); + this.pullTimeout = pullTimeout; + } + + void offer(int i, T object) { + final boolean b = queues.get(i).offer(object); + Preconditions.checkState(b, "Failed to offer " + object + + " to queue, i=" + i); + } + + T poll(int i) throws InterruptedIOException { + try { + return queues.get(i).poll(pullTimeout, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw DFSUtil.toInterruptedIOException("poll interrupted, i=" + i, e); } } - boolean shouldLocateFollowingBlock() { - return shouldLocateFollowingBlock; + T peek(int i) { + return queues.get(i).peek(); + } + } + + /** Coordinate the communication between the streamers. */ + static class Coordinator { + private final MultipleBlockingQueue stripedBlocks; + private final MultipleBlockingQueue endBlocks; + private final MultipleBlockingQueue updateBlocks; + + Coordinator(final DfsClientConf conf, final int numDataBlocks, + final int numAllBlocks) { + stripedBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1, + conf.getStripedWriteMaxSecondsGetStripedBlock()); + endBlocks = new MultipleBlockingQueue<>(numDataBlocks, 1, + conf.getStripedWriteMaxSecondsGetEndedBlock()); + updateBlocks = new MultipleBlockingQueue<>(numAllBlocks, 1, + conf.getStripedWriteMaxSecondsGetStripedBlock()); } void putEndBlock(int i, ExtendedBlock block) { - shouldLocateFollowingBlock = true; - - final boolean b = endBlocks.get(i).offer(block); - Preconditions.checkState(b, "Failed to add " + block - + " to endBlocks queue, i=" + i); + endBlocks.offer(i, block); } ExtendedBlock getEndBlock(int i) throws InterruptedIOException { - try { - return endBlocks.get(i).poll( - conf.getStripedWriteMaxSecondsGetEndedBlock(), - TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw DFSUtil.toInterruptedIOException( - "getEndBlock interrupted, i=" + i, e); - } + return endBlocks.poll(i); + } + + void putUpdateBlock(int i, ExtendedBlock block) { + updateBlocks.offer(i, block); + } + + ExtendedBlock getUpdateBlock(int i) throws InterruptedIOException { + return updateBlocks.poll(i); } void setBytesEndBlock(int i, long newBytes, ExtendedBlock block) { - ExtendedBlock b = endBlocks.get(i).peek(); + ExtendedBlock b = endBlocks.peek(i); if (b == null) { // streamer just has failed, put end block and continue b = block; @@ -119,22 +138,11 @@ void putStripedBlock(int i, LocatedBlock block) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("putStripedBlock " + block + ", i=" + i); } - final boolean b = stripedBlocks.get(i).offer(block); - if (!b) { - throw new IOException("Failed: " + block + ", i=" + i); - } + stripedBlocks.offer(i, block); } LocatedBlock getStripedBlock(int i) throws IOException { - final LocatedBlock lb; - try { - lb = stripedBlocks.get(i).poll( - conf.getStripedWriteMaxSecondsGetStripedBlock(), - TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw DFSUtil.toInterruptedIOException("getStripedBlock interrupted", e); - } - + final LocatedBlock lb = stripedBlocks.poll(i); if (lb == null) { throw new IOException("Failed: i=" + i); } @@ -218,6 +226,11 @@ private StripedDataStreamer getLeadingStreamer() { return streamers.get(0); } + @Override + ExtendedBlock getBlock() { + return getLeadingStreamer().getBlock(); + } + /** Construct a new output stream for creating a file. */ DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, EnumSet flag, Progressable progress, @@ -292,6 +305,7 @@ private void checkStreamers() throws IOException { int count = 0; for(StripedDataStreamer s : streamers) { if (!s.isFailed()) { + s.getErrorState().initExtenalError(); count++; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 8f07341f74..1344d54d61 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -44,7 +44,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; @@ -210,6 +209,7 @@ synchronized void throwException4Close() throws IOException { static class ErrorState { private boolean error = false; + private boolean extenalError = false; private int badNodeIndex = -1; private int restartingNodeIndex = -1; private long restartingNodeDeadline = 0; @@ -221,6 +221,7 @@ static class ErrorState { synchronized void reset() { error = false; + extenalError = false; badNodeIndex = -1; restartingNodeIndex = -1; restartingNodeDeadline = 0; @@ -231,13 +232,19 @@ synchronized boolean hasError() { } synchronized boolean hasDatanodeError() { - return error && isNodeMarked(); + return error && (isNodeMarked() || extenalError); } synchronized void setError(boolean err) { this.error = err; } + synchronized void initExtenalError() { + setError(true); + this.extenalError = true; + } + + synchronized void setBadNodeIndex(int index) { this.badNodeIndex = index; } @@ -1736,6 +1743,10 @@ Token getBlockToken() { return accessToken; } + ErrorState getErrorState() { + return errorState; + } + /** * Put a packet to the data queue * diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index 258fc6505d..7b7db75329 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -94,36 +94,69 @@ protected void endBlock() { protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException { if (isLeadingStreamer()) { - if (coordinator.shouldLocateFollowingBlock()) { + if (block != null) { // set numByte for the previous block group long bytes = 0; for (int i = 0; i < NUM_DATA_BLOCKS; i++) { final ExtendedBlock b = coordinator.getEndBlock(i); - bytes += b == null ? 0 : b.getNumBytes(); + if (b != null) { + StripedBlockUtil.checkBlocks(block, i, b); + bytes += b.getNumBytes(); + } } block.setNumBytes(bytes); } - final LocatedStripedBlock lsb - = (LocatedStripedBlock)super.locateFollowingBlock(excludedNodes); - if (LOG.isDebugEnabled()) { - LOG.debug("Obtained block group " + lsb); - } - LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(lsb, - BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); - - assert blocks.length == (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) : - "Fail to get block group from namenode: blockGroupSize: " + - (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) + ", blocks.length: " + - blocks.length; - for (int i = 0; i < blocks.length; i++) { - coordinator.putStripedBlock(i, blocks[i]); - } + putLoactedBlocks(super.locateFollowingBlock(excludedNodes)); } return coordinator.getStripedBlock(index); } + void putLoactedBlocks(LocatedBlock lb) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Obtained block group " + lb); + } + LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( + (LocatedStripedBlock)lb, + BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); + + // TODO allow write to continue if blocks.length >= NUM_DATA_BLOCKS + assert blocks.length == (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) : + "Fail to get block group from namenode: blockGroupSize: " + + (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS) + ", blocks.length: " + + blocks.length; + for (int i = 0; i < blocks.length; i++) { + coordinator.putStripedBlock(i, blocks[i]); + } + } + + @Override + LocatedBlock updateBlockForPipeline() throws IOException { + if (isLeadingStreamer()) { + final LocatedBlock updated = super.updateBlockForPipeline(); + final ExtendedBlock block = updated.getBlock(); + for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { + final LocatedBlock lb = new LocatedBlock(block, null, null, null, + -1, updated.isCorrupt(), null); + lb.setBlockToken(updated.getBlockToken()); + coordinator.putStripedBlock(i, lb); + } + } + return coordinator.getStripedBlock(index); + } + + @Override + ExtendedBlock updatePipeline(long newGS) throws IOException { + if (isLeadingStreamer()) { + final ExtendedBlock newBlock = super.updatePipeline(newGS); + for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { + coordinator.putUpdateBlock(i, new ExtendedBlock(newBlock)); + } + } + return coordinator.getUpdateBlock(index); + } + @Override public String toString() { return "#" + index + ": isFailed? " + Boolean.toString(isFailed).charAt(0) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 9b5a923862..2275d91bcc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -21,18 +21,15 @@ import java.util.Iterator; import java.util.List; -import com.google.common.annotations.VisibleForTesting; - import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import com.google.common.annotations.VisibleForTesting; + /** * A Datanode has one or more storages. A storage in the Datanode is represented * by this class. @@ -41,7 +38,7 @@ public class DatanodeStorageInfo { public static final DatanodeStorageInfo[] EMPTY_ARRAY = {}; public static DatanodeInfo[] toDatanodeInfos(DatanodeStorageInfo[] storages) { - return toDatanodeInfos(Arrays.asList(storages)); + return storages == null? null: toDatanodeInfos(Arrays.asList(storages)); } static DatanodeInfo[] toDatanodeInfos(List storages) { final DatanodeInfo[] datanodes = new DatanodeInfo[storages.size()]; @@ -61,6 +58,9 @@ static DatanodeDescriptor[] toDatanodeDescriptors( } public static String[] toStorageIDs(DatanodeStorageInfo[] storages) { + if (storages == null) { + return null; + } String[] storageIDs = new String[storages.length]; for(int i = 0; i < storageIDs.length; i++) { storageIDs[i] = storages[i].getStorageID(); @@ -69,6 +69,9 @@ public static String[] toStorageIDs(DatanodeStorageInfo[] storages) { } public static StorageType[] toStorageTypes(DatanodeStorageInfo[] storages) { + if (storages == null) { + return null; + } StorageType[] storageTypes = new StorageType[storages.length]; for(int i = 0; i < storageTypes.length; i++) { storageTypes[i] = storages[i].getStorageType(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 60f86d6fa3..7e45e9076c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -5787,29 +5787,30 @@ void reportBadBlocks(LocatedBlock[] blocks) throws IOException { * Get a new generation stamp together with an access token for * a block under construction * - * This method is called for recovering a failed pipeline or setting up - * a pipeline to append to a block. + * This method is called for recovering a failed write or setting up + * a block for appended. * * @param block a block * @param clientName the name of a client * @return a located block with a new generation stamp and an access token * @throws IOException if any error occurs */ - LocatedBlock updateBlockForPipeline(ExtendedBlock block, + LocatedBlock bumpBlockGenerationStamp(ExtendedBlock block, String clientName) throws IOException { - LocatedBlock locatedBlock; + final LocatedBlock locatedBlock; checkOperation(OperationCategory.WRITE); writeLock(); try { checkOperation(OperationCategory.WRITE); // check vadility of parameters - checkUCBlock(block, clientName); + final INodeFile file = checkUCBlock(block, clientName); // get a new generation stamp and an access token block.setGenerationStamp(nextGenerationStamp(blockIdManager.isLegacyBlock(block.getLocalBlock()))); - locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]); - blockManager.setBlockToken(locatedBlock, BlockTokenIdentifier.AccessMode.WRITE); + + locatedBlock = BlockManager.newLocatedBlock( + block, file.getLastBlock(), null, -1); } finally { writeUnlock(); } @@ -5864,23 +5865,26 @@ private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock, // check the vadility of the block and lease holder name final INodeFile pendingFile = checkUCBlock(oldBlock, clientName); final BlockInfo lastBlock = pendingFile.getLastBlock(); - // when updating pipeline, the last block must be contiguous block - assert lastBlock instanceof BlockInfoContiguousUnderConstruction; - BlockInfoContiguousUnderConstruction blockinfo = - (BlockInfoContiguousUnderConstruction) lastBlock; + final BlockInfoUnderConstruction blockinfo = (BlockInfoUnderConstruction)lastBlock; // check new GS & length: this is not expected - if (newBlock.getGenerationStamp() <= blockinfo.getGenerationStamp() || - newBlock.getNumBytes() < blockinfo.getNumBytes()) { - String msg = "Update " + oldBlock + " (len = " + - blockinfo.getNumBytes() + ") to an older state: " + newBlock + - " (len = " + newBlock.getNumBytes() +")"; + if (newBlock.getGenerationStamp() <= lastBlock.getGenerationStamp()) { + final String msg = "Update " + oldBlock + " but the new block " + newBlock + + " does not have a larger generation stamp than the last block " + + lastBlock; + LOG.warn(msg); + throw new IOException(msg); + } + if (newBlock.getNumBytes() < lastBlock.getNumBytes()) { + final String msg = "Update " + oldBlock + " (size=" + + oldBlock.getNumBytes() + ") to a smaller size block " + newBlock + + " (size=" + newBlock.getNumBytes() + ")"; LOG.warn(msg); throw new IOException(msg); } // Update old block with the new generation stamp and new length - blockinfo.setNumBytes(newBlock.getNumBytes()); + lastBlock.setNumBytes(newBlock.getNumBytes()); blockinfo.setGenerationStampAndVerifyReplicas(newBlock.getGenerationStamp()); // find the DatanodeDescriptor objects diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 747f5283d7..9e94b90863 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -788,7 +788,7 @@ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName) throws IOException { checkNNStartup(); - return namesystem.updateBlockForPipeline(block, clientName); + return namesystem.bumpBlockGenerationStamp(block, clientName); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 6f7dcb1478..0b09f3776c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -34,6 +34,7 @@ import org.apache.hadoop.io.erasurecode.rawcoder.RSRawDecoder; import java.util.*; +import java.io.IOException; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; @@ -104,12 +105,17 @@ public static LocatedBlock constructInternalBlock(LocatedStripedBlock bg, final ExtendedBlock blk = constructInternalBlock( bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup); - return new LocatedBlock(blk, - new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]}, - new String[]{bg.getStorageIDs()[idxInReturnedLocs]}, - new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]}, - bg.getStartOffset() + idxInBlockGroup * cellSize, bg.isCorrupt(), - null); + final long offset = bg.getStartOffset() + idxInBlockGroup * cellSize; + if (idxInReturnedLocs < bg.getLocations().length) { + return new LocatedBlock(blk, + new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]}, + new String[]{bg.getStorageIDs()[idxInReturnedLocs]}, + new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]}, + offset, bg.isCorrupt(), null); + } else { + return new LocatedBlock(blk, null, null, null, + offset, bg.isCorrupt(), null); + } } /** @@ -823,4 +829,26 @@ public String toString() { return "(index=" + index + ", state =" + state + ")"; } } + + /** + * Check if the information such as IDs and generation stamps in block-i + * match block-0. + */ + public static void checkBlocks(ExtendedBlock block0, int i, + ExtendedBlock blocki) throws IOException { + + if (!blocki.getBlockPoolId().equals(block0.getBlockPoolId())) { + throw new IOException("Block pool IDs mismatched: block0=" + + block0 + ", block" + i + "=" + blocki); + } + if (blocki.getBlockId() - i != block0.getBlockId()) { + throw new IOException("Block IDs mismatched: block0=" + + block0 + ", block" + i + "=" + blocki); + } + if (blocki.getGenerationStamp() != block0.getGenerationStamp()) { + throw new IOException("Generation stamps mismatched: block0=" + + block0 + ", block" + i + "=" + blocki); + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 558c45d8c3..82c078141f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1953,11 +1953,9 @@ public static Block addStripedBlockToFile(List dataNodes, * Because currently DFSStripedOutputStream does not support hflush/hsync, * tests can use this method to flush all the buffered data to DataNodes. */ - public static void writeAndFlushStripedOutputStream( - DFSStripedOutputStream out, int chunkSize) throws IOException { - // FSOutputSummer.BUFFER_NUM_CHUNKS == 9 - byte[] toWrite = new byte[chunkSize * 9 + 1]; - out.write(toWrite); + public static ExtendedBlock flushInternal(DFSStripedOutputStream out) + throws IOException { out.flushInternal(); + return out.getBlock(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java index 4ad3b2e4e0..c232e13610 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -44,6 +43,8 @@ import org.junit.Before; import org.junit.Test; +import com.google.common.base.Preconditions; + public class TestDFSStripedOutputStreamWithFailure { public static final Log LOG = LogFactory.getLog( TestDFSStripedOutputStreamWithFailure.class); @@ -59,6 +60,9 @@ public class TestDFSStripedOutputStreamWithFailure { private static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK; private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_BLOCKS; + private static final int FLUSH_POS + = 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1; + private final HdfsConfiguration conf = new HdfsConfiguration(); private MiniDFSCluster cluster; private DistributedFileSystem dfs; @@ -149,50 +153,53 @@ private void runTest(final String src, final int length, final int dnIndex) { cluster.startDataNodes(conf, 1, true, null, null); cluster.waitActive(); - runTest(new Path(dir, src), length, dnIndex); + runTest(new Path(dir, src), length, length/2, dnIndex); } catch(Exception e) { LOG.info("FAILED", e); Assert.fail(StringUtils.stringifyException(e)); } } - private void runTest(final Path p, final int length, + private void runTest(final Path p, final int length, final int killPos, final int dnIndex) throws Exception { - LOG.info("p=" + p + ", length=" + length + ", dnIndex=" + dnIndex); + LOG.info("p=" + p + ", length=" + length + ", killPos=" + killPos + + ", dnIndex=" + dnIndex); + Preconditions.checkArgument(killPos < length); + Preconditions.checkArgument(killPos > FLUSH_POS); final String fullPath = p.toString(); final AtomicInteger pos = new AtomicInteger(); final FSDataOutputStream out = dfs.create(p); - final AtomicBoolean killed = new AtomicBoolean(); - final Thread killer = new Thread(new Runnable() { - @Override - public void run() { - killDatanode(cluster, (DFSStripedOutputStream)out.getWrappedStream(), - dnIndex, pos); - killed.set(true); - } - }); - killer.start(); + final DFSStripedOutputStream stripedOut + = (DFSStripedOutputStream)out.getWrappedStream(); - final int mask = (1 << 16) - 1; + long oldGS = -1; + boolean killed = false; for(; pos.get() < length; ) { final int i = pos.getAndIncrement(); + if (i == killPos) { + final long gs = getGenerationStamp(stripedOut); + Assert.assertTrue(oldGS != -1); + Assert.assertEquals(oldGS, gs); + + killDatanode(cluster, stripedOut, dnIndex, pos); + killed = true; + } + write(out, i); - if ((i & mask) == 0) { - final long ms = 100; - LOG.info("i=" + i + " sleep " + ms); - Thread.sleep(ms); + + if (i == FLUSH_POS) { + oldGS = getGenerationStamp(stripedOut); } } - killer.join(10000); - Assert.assertTrue(killed.get()); out.close(); + Assert.assertTrue(killed); // check file length final FileStatus status = dfs.getFileStatus(p); Assert.assertEquals(length, status.getLen()); - checkData(dfs, fullPath, length, dnIndex); + checkData(dfs, fullPath, length, dnIndex, oldGS); } static void write(FSDataOutputStream out, int i) throws IOException { @@ -203,6 +210,14 @@ static void write(FSDataOutputStream out, int i) throws IOException { } } + static long getGenerationStamp(DFSStripedOutputStream out) + throws IOException { + final long gs = DFSTestUtil.flushInternal(out).getGenerationStamp(); + LOG.info("getGenerationStamp returns " + gs); + return gs; + + } + static DatanodeInfo getDatanodes(StripedDataStreamer streamer) { for(;;) { final DatanodeInfo[] datanodes = streamer.getNodes(); @@ -228,7 +243,7 @@ static void killDatanode(MiniDFSCluster cluster, DFSStripedOutputStream out, } static void checkData(DistributedFileSystem dfs, String src, int length, - int killedDnIndex) throws IOException { + int killedDnIndex, long oldGS) throws IOException { List> blockGroupList = new ArrayList<>(); LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(src, 0L); final int expectedNumGroup = (length - 1)/BLOCK_GROUP_SIZE + 1; @@ -236,6 +251,12 @@ static void checkData(DistributedFileSystem dfs, String src, int length, for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) { Assert.assertTrue(firstBlock instanceof LocatedStripedBlock); + + final long gs = firstBlock.getBlock().getGenerationStamp(); + final String s = "gs=" + gs + ", oldGS=" + oldGS; + LOG.info(s); + Assert.assertTrue(s, gs > oldGS); + LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( (LocatedStripedBlock) firstBlock, CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); @@ -247,7 +268,7 @@ static void checkData(DistributedFileSystem dfs, String src, int length, final boolean isLastGroup = group == blockGroupList.size() - 1; final int groupSize = !isLastGroup? BLOCK_GROUP_SIZE : length - (blockGroupList.size() - 1)*BLOCK_GROUP_SIZE; - final int numCellInGroup = (int)((groupSize - 1)/CELL_SIZE + 1); + final int numCellInGroup = (groupSize - 1)/CELL_SIZE + 1; final int lastCellIndex = (numCellInGroup - 1) % NUM_DATA_BLOCKS; final int lastCellSize = groupSize - (numCellInGroup - 1)*CELL_SIZE; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java index a35cbf480a..7876d1a1d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddStripedBlocks.java @@ -105,6 +105,14 @@ public void testAllocateBlockId() throws Exception { Assert.assertEquals(firstId + HdfsServerConstants.MAX_BLOCKS_IN_GROUP, secondId); } + private static void writeAndFlushStripedOutputStream( + DFSStripedOutputStream out, int chunkSize) throws IOException { + // FSOutputSummer.BUFFER_NUM_CHUNKS == 9 + byte[] toWrite = new byte[chunkSize * 9 + 1]; + out.write(toWrite); + DFSTestUtil.flushInternal(out); + } + @Test (timeout=60000) public void testAddStripedBlock() throws Exception { final Path file = new Path("/file1"); @@ -112,7 +120,7 @@ public void testAddStripedBlock() throws Exception { FSDataOutputStream out = null; try { out = dfs.create(file, (short) 1); - DFSTestUtil.writeAndFlushStripedOutputStream( + writeAndFlushStripedOutputStream( (DFSStripedOutputStream) out.getWrappedStream(), DFS_BYTES_PER_CHECKSUM_DEFAULT); @@ -190,7 +198,7 @@ public void testGetLocatedStripedBlocks() throws Exception { FSDataOutputStream out = null; try { out = dfs.create(file, (short) 1); - DFSTestUtil.writeAndFlushStripedOutputStream( + writeAndFlushStripedOutputStream( (DFSStripedOutputStream) out.getWrappedStream(), DFS_BYTES_PER_CHECKSUM_DEFAULT);