From 067ec8c2b14fb0929dc348b763383838e06ff8a5 Mon Sep 17 00:00:00 2001 From: Walter Su Date: Thu, 27 Aug 2015 09:09:52 +0800 Subject: [PATCH] HDFS-8838. Erasure Coding: Tolerate datanode failures in DFSStripedOutputStream when the data length is small. Contributed by Tsz Wo Nicholas Sze. --- .../hadoop/util/ShutdownHookManager.java | 6 + .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../apache/hadoop/hdfs/DFSOutputStream.java | 6 +- .../hadoop/hdfs/DFSStripedOutputStream.java | 56 +++-- .../org/apache/hadoop/hdfs/DataStreamer.java | 43 ++-- .../hadoop/hdfs/StripedDataStreamer.java | 138 +++++++++--- .../server/namenode/FSDirWriteFileOp.java | 3 + .../apache/hadoop/hdfs/MiniDFSCluster.java | 2 + .../hdfs/TestDFSStripedOutputStream.java | 2 +- ...TestDFSStripedOutputStreamWithFailure.java | 197 ++++++++++++++---- ...tDFSStripedOutputStreamWithFailure000.java | 22 ++ ...tDFSStripedOutputStreamWithFailure010.java | 22 ++ 12 files changed, 385 insertions(+), 115 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure000.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure010.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java index 989c96a8e3..85533dbffa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ShutdownHookManager.java @@ -187,4 +187,10 @@ public boolean isShutdownInProgress() { return shutdownInProgress.get(); } + /** + * clear all registered shutdownHooks. + */ + public void clearShutdownHooks() { + hooks.clear(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index dad997a06b..8b25e68c04 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -400,3 +400,6 @@ HDFS-8220. Erasure Coding: StripedDataStreamer fails to handle the blocklocations which doesn't satisfy BlockGroupSize. (Rakesh R via zhz) + + HDFS-8838. Erasure Coding: Tolerate datanode failures in DFSStripedOutputStream + when the data length is small. (szetszwo via waltersu4549) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 00f3a6505c..1654a26ce2 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -406,13 +406,13 @@ protected synchronized void writeChunk(byte[] b, int offset, int len, if (currentPacket == null) { currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer() .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false); - if (DFSClient.LOG.isDebugEnabled()) { - DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + + if (LOG.isDebugEnabled()) { + LOG.debug("WriteChunk allocating new packet seqno=" + currentPacket.getSeqno() + ", src=" + src + ", packetSize=" + packetSize + ", chunksPerPacket=" + chunksPerPacket + - ", bytesCurBlock=" + getStreamer().getBytesCurBlock()); + ", bytesCurBlock=" + getStreamer().getBytesCurBlock() + ", " + this); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 4ca8fe6b42..d3a054affe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -170,15 +170,18 @@ ExtendedBlock getBlockGroup() { } final boolean atBlockGroupBoundary = s0.getBytesCurBlock() == 0 && b0.getNumBytes() > 0; + final ExtendedBlock block = new ExtendedBlock(b0); - long numBytes = b0.getNumBytes(); - for (int i = 1; i < numDataBlocks; i++) { + long numBytes = atBlockGroupBoundary? b0.getNumBytes(): s0.getBytesCurBlock(); + for (int i = 1; i < numAllBlocks; i++) { final StripedDataStreamer si = getStripedDataStreamer(i); final ExtendedBlock bi = si.getBlock(); if (bi != null && bi.getGenerationStamp() > block.getGenerationStamp()) { block.setGenerationStamp(bi.getGenerationStamp()); } - numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock(); + if (i < numDataBlocks) { + numBytes += atBlockGroupBoundary? bi.getNumBytes(): si.getBytesCurBlock(); + } } block.setNumBytes(numBytes); if (LOG.isDebugEnabled()) { @@ -318,8 +321,7 @@ private synchronized StripedDataStreamer getCurrentStreamer() { return (StripedDataStreamer)streamer; } - private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) - throws IOException { + private synchronized StripedDataStreamer setCurrentStreamer(int newIdx) { // backup currentPacket for current streamer int oldIdx = streamers.indexOf(streamer); if (oldIdx >= 0) { @@ -349,11 +351,11 @@ private static void encode(RawErasureEncoder encoder, int numData, } - private void checkStreamers() throws IOException { + private void checkStreamers(boolean setExternalError) throws IOException { int count = 0; for(StripedDataStreamer s : streamers) { if (!s.isFailed()) { - if (s.getBlock() != null) { + if (setExternalError && s.getBlock() != null) { s.getErrorState().initExternalError(); } count++; @@ -369,11 +371,16 @@ private void checkStreamers() throws IOException { } } - private void handleStreamerFailure(String err, - Exception e) throws IOException { + private void handleStreamerFailure(String err, Exception e) + throws IOException { + handleStreamerFailure(err, e, true); + } + + private void handleStreamerFailure(String err, Exception e, + boolean setExternalError) throws IOException { LOG.warn("Failed: " + err + ", " + this, e); getCurrentStreamer().setFailed(true); - checkStreamers(); + checkStreamers(setExternalError); currentPacket = null; } @@ -505,10 +512,10 @@ private long getCurrentSumBytes() { return sum; } - private void writeParityCellsForLastStripe() throws IOException { + private boolean generateParityCellsForLastStripe() { final long currentBlockGroupBytes = getCurrentSumBytes(); if (currentBlockGroupBytes % stripeDataSize() == 0) { - return; + return false; } final int firstCellSize = @@ -530,8 +537,7 @@ private void writeParityCellsForLastStripe() throws IOException { } buffers[i].flip(); } - - writeParityCells(); + return true; } void writeParityCells() throws IOException { @@ -603,12 +609,14 @@ protected synchronized void closeImpl() throws IOException { // flush from all upper layers try { flushBuffer(); - // if the last stripe is incomplete, generate and write parity cells - writeParityCellsForLastStripe(); - enqueueAllCurrentPackets(); } catch(Exception e) { - handleStreamerFailure("closeImpl", e); + handleStreamerFailure("flushBuffer " + getCurrentStreamer(), e); } + // if the last stripe is incomplete, generate and write parity cells + if (generateParityCellsForLastStripe()) { + writeParityCells(); + } + enqueueAllCurrentPackets(); for (int i = 0; i < numAllBlocks; i++) { final StripedDataStreamer s = setCurrentStreamer(i); @@ -620,7 +628,7 @@ protected synchronized void closeImpl() throws IOException { // flush all data to Datanode flushInternal(); } catch(Exception e) { - handleStreamerFailure("closeImpl", e); + handleStreamerFailure("flushInternal " + s, e, false); } } } @@ -643,9 +651,13 @@ protected synchronized void closeImpl() throws IOException { private void enqueueAllCurrentPackets() throws IOException { int idx = streamers.indexOf(getCurrentStreamer()); for(int i = 0; i < streamers.size(); i++) { - setCurrentStreamer(i); - if (currentPacket != null) { - enqueueCurrentPacket(); + final StripedDataStreamer si = setCurrentStreamer(i); + if (!si.isFailed() && currentPacket != null) { + try { + enqueueCurrentPacket(); + } catch (IOException e) { + handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e, false); + } } } setCurrentStreamer(idx); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index c78199ed30..bbcdd1fb83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -173,7 +173,7 @@ private static void releaseBuffer(List packets, ByteArrayManager bam) packets.clear(); } - static class LastExceptionInStreamer { + class LastExceptionInStreamer { private IOException thrown; synchronized void set(Throwable t) { @@ -191,7 +191,8 @@ synchronized void check(boolean resetToNull) throws IOException { if (thrown != null) { if (LOG.isTraceEnabled()) { // wrap and print the exception to know when the check is called - LOG.trace("Got Exception while checking", new Throwable(thrown)); + LOG.trace("Got Exception while checking, " + DataStreamer.this, + new Throwable(thrown)); } final IOException e = thrown; if (resetToNull) { @@ -584,16 +585,13 @@ public void run() { } // get new block from namenode. + if (LOG.isDebugEnabled()) { + LOG.debug("stage=" + stage + ", " + this); + } if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { - if(LOG.isDebugEnabled()) { - LOG.debug("Allocating new block " + this); - } setPipeline(nextBlockOutputStream()); initDataStreaming(); } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { - if(LOG.isDebugEnabled()) { - LOG.debug("Append to block " + block); - } setupPipelineForAppendOrRecovery(); if (streamerClosed) { continue; @@ -639,8 +637,7 @@ public void run() { } if (LOG.isDebugEnabled()) { - LOG.debug("DataStreamer block " + block + - " sending packet " + one); + LOG.debug(this + " sending " + one); } // write out data to remote datanode @@ -1426,16 +1423,21 @@ static ExtendedBlock newBlock(ExtendedBlock b, final long newGS) { /** update pipeline at the namenode */ ExtendedBlock updatePipeline(long newGS) throws IOException { final ExtendedBlock newBlock = newBlock(block, newGS); - return callUpdatePipeline(block, newBlock); + return callUpdatePipeline(block, newBlock, nodes, storageIDs); } - ExtendedBlock callUpdatePipeline(ExtendedBlock oldBlock, ExtendedBlock newBlock) + ExtendedBlock callUpdatePipeline(ExtendedBlock oldBlock, ExtendedBlock newBlock, + DatanodeInfo[] newNodes, String[] newStorageIDs) throws IOException { dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock, newBlock, - nodes, storageIDs); + newNodes, newStorageIDs); return newBlock; } + int getNumBlockWriteRetry() { + return dfsClient.getConf().getNumBlockWriteRetry(); + } + /** * Open a DataStreamer to a DataNode so that it can be written to. * This happens when a file is created and each time a new block is allocated. @@ -1446,7 +1448,7 @@ private LocatedBlock nextBlockOutputStream() throws IOException { LocatedBlock lb = null; DatanodeInfo[] nodes = null; StorageType[] storageTypes = null; - int count = dfsClient.getConf().getNumBlockWriteRetry(); + int count = getNumBlockWriteRetry(); boolean success = false; ExtendedBlock oldBlock = block; do { @@ -1502,7 +1504,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, String firstBadLink = ""; boolean checkRestart = false; if (LOG.isDebugEnabled()) { - LOG.debug("pipeline = " + Arrays.asList(nodes)); + LOG.debug("pipeline = " + Arrays.toString(nodes) + ", " + this); } // persist blocks on namenode on next flush @@ -1574,7 +1576,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, errorState.reset(); } catch (IOException ie) { if (!errorState.isRestartingNode()) { - LOG.info("Exception in createBlockOutputStream", ie); + LOG.info("Exception in createBlockOutputStream " + this, ie); } if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { LOG.info("Will fetch a new encryption key and retry, " @@ -1649,7 +1651,7 @@ private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) { } } - protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) + LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException { final DfsClientConf conf = dfsClient.getConf(); int retries = conf.getNumBlockWriteLocateFollowingRetry(); @@ -1755,6 +1757,10 @@ DatanodeInfo[] getNodes() { return nodes; } + String[] getStorageIDs() { + return storageIDs; + } + /** * return the token of the block * @@ -1933,7 +1939,6 @@ void closeSocket() throws IOException { @Override public String toString() { - return (block == null? null: block.getLocalBlock()) - + "@" + Arrays.toString(getNodes()); + return block == null? "block==null": "" + block.getLocalBlock(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index f533bf9b10..a20caa5332 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hdfs.DFSStripedOutputStream.Coordinator; @@ -39,6 +40,8 @@ import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; +import com.google.common.annotations.VisibleForTesting; + /** * This class extends {@link DataStreamer} to support writing striped blocks * to datanodes. @@ -58,13 +61,13 @@ public class StripedDataStreamer extends DataStreamer { * @param the queue entry type. */ static abstract class ConcurrentPoll { - private final MultipleBlockingQueue queue; + final MultipleBlockingQueue queue; ConcurrentPoll(MultipleBlockingQueue queue) { this.queue = queue; } - T poll(final int i) throws IOException { + T poll(final int i) throws InterruptedIOException { for(;;) { synchronized(queue) { final T polled = queue.poll(i); @@ -72,18 +75,17 @@ T poll(final int i) throws IOException { return polled; } if (isReady2Populate()) { - populate(); - return queue.poll(i); + try { + populate(); + return queue.poll(i); + } catch(IOException ioe) { + LOG.warn("Failed to populate, " + this, ioe); + } } } // sleep and then retry. - try { - Thread.sleep(100); - } catch(InterruptedException ie) { - throw DFSUtil.toInterruptedIOException( - "Sleep interrupted during poll", ie); - } + sleep(100, "poll"); } } @@ -94,6 +96,15 @@ boolean isReady2Populate() { abstract void populate() throws IOException; } + private static void sleep(long ms, String op) throws InterruptedIOException { + try { + Thread.sleep(ms); + } catch(InterruptedException ie) { + throw DFSUtil.toInterruptedIOException( + "Sleep interrupted during " + op, ie); + } + } + private final Coordinator coordinator; private final int index; private volatile boolean failed; @@ -135,11 +146,14 @@ protected void endBlock() { } @Override - protected LocatedBlock locateFollowingBlock(final DatanodeInfo[] excludedNodes) + int getNumBlockWriteRetry() { + return 0; + } + + @Override + LocatedBlock locateFollowingBlock(final DatanodeInfo[] excludedNodes) throws IOException { - final MultipleBlockingQueue followingBlocks - = coordinator.getFollowingBlocks(); - return new ConcurrentPoll(followingBlocks) { + return new ConcurrentPoll(coordinator.getFollowingBlocks()) { @Override boolean isReady2Populate() { return super.isReady2Populate() @@ -194,18 +208,24 @@ void populate() throws IOException { si.endBlock(); si.close(true); } else { - followingBlocks.offer(i, blocks[i]); + queue.offer(i, blocks[i]); } } } }.poll(index); } + @VisibleForTesting + LocatedBlock peekFollowingBlock() { + return coordinator.getFollowingBlocks().peek(index); + } + @Override LocatedBlock updateBlockForPipeline() throws IOException { - final MultipleBlockingQueue newBlocks - = coordinator.getNewBlocks(); - return new ConcurrentPoll(newBlocks) { + if (LOG.isDebugEnabled()) { + LOG.debug("updateBlockForPipeline(), " + this); + } + return new ConcurrentPoll(coordinator.getNewBlocks()) { @Override void populate() throws IOException { final ExtendedBlock bg = coordinator.getBlockGroup(); @@ -224,10 +244,22 @@ void populate() throws IOException { final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS), null, null, null, -1, updated.isCorrupt(), null); lb.setBlockToken(updatedBlks[i].getBlockToken()); - newBlocks.offer(i, lb); + queue.offer(i, lb); } else { - final LocatedBlock lb = coordinator.getFollowingBlocks().peek(i); - lb.getBlock().setGenerationStamp(newGS); + final MultipleBlockingQueue followingBlocks + = coordinator.getFollowingBlocks(); + synchronized(followingBlocks) { + final LocatedBlock lb = followingBlocks.peek(i); + if (lb != null) { + lb.getBlock().setGenerationStamp(newGS); + si.getErrorState().reset(); + continue; + } + } + + //streamer i just have polled the block, sleep and retry. + sleep(100, "updateBlockForPipeline, " + this); + i--; } } } @@ -236,21 +268,64 @@ void populate() throws IOException { @Override ExtendedBlock updatePipeline(final long newGS) throws IOException { - final MultipleBlockingQueue updateBlocks - = coordinator.getUpdateBlocks(); - return new ConcurrentPoll(updateBlocks) { + if (LOG.isDebugEnabled()) { + LOG.debug("updatePipeline(newGS=" + newGS + "), " + this); + } + return new ConcurrentPoll(coordinator.getUpdateBlocks()) { @Override void populate() throws IOException { + final MultipleBlockingQueue followingBlocks + = coordinator.getFollowingBlocks(); final ExtendedBlock bg = coordinator.getBlockGroup(); final ExtendedBlock newBG = newBlock(bg, newGS); - final ExtendedBlock updated = callUpdatePipeline(bg, newBG); - for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { - StripedDataStreamer si = coordinator.getStripedDataStreamer(i); - if (si.isFailed()) { - continue; // skipping failed data streamer + + final int n = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; + final DatanodeInfo[] newNodes = new DatanodeInfo[n]; + final String[] newStorageIDs = new String[n]; + for (int i = 0; i < n; i++) { + final StripedDataStreamer si = coordinator.getStripedDataStreamer(i); + DatanodeInfo[] nodes = si.getNodes(); + String[] storageIDs = si.getStorageIDs(); + if (nodes == null || storageIDs == null) { + synchronized(followingBlocks) { + final LocatedBlock lb = followingBlocks.peek(i); + if (lb != null) { + nodes = lb.getLocations(); + storageIDs = lb.getStorageIDs(); + } + } } + if (nodes != null && storageIDs != null) { + newNodes[i] = nodes[0]; + newStorageIDs[i] = storageIDs[0]; + } else { + //streamer i just have polled the block, sleep and retry. + sleep(100, "updatePipeline, " + this); + i--; + } + } + final ExtendedBlock updated = callUpdatePipeline(bg, newBG, newNodes, + newStorageIDs); + + for (int i = 0; i < n; i++) { + final StripedDataStreamer si = coordinator.getStripedDataStreamer(i); final ExtendedBlock bi = si.getBlock(); - updateBlocks.offer(i, newBlock(bi, updated.getGenerationStamp())); + if (bi != null) { + queue.offer(i, newBlock(bi, updated.getGenerationStamp())); + } else if (!si.isFailed()) { + synchronized(followingBlocks) { + final LocatedBlock lb = followingBlocks.peek(i); + if (lb != null) { + lb.getBlock().setGenerationStamp(newGS); + si.getErrorState().reset(); + continue; + } + } + + //streamer i just have polled the block, sleep and retry. + sleep(100, "updatePipeline, " + this); + i--; + } } } }.poll(index); @@ -258,7 +333,6 @@ void populate() throws IOException { @Override public String toString() { - return "#" + index + ": failed? " + Boolean.toString(failed).charAt(0) - + ", " + super.toString(); + return "#" + index + ": " + (failed? "failed, ": "") + super.toString(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java index b9466f6940..ffd8fbcddf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirWriteFileOp.java @@ -134,6 +134,9 @@ static void abandonBlock( FSNamesystem fsn = fsd.getFSNamesystem(); final INodeFile file = fsn.checkLease(src, holder, inode, fileId); Preconditions.checkState(file.isUnderConstruction()); + if (file.isStriped()) { + return; // do not abandon block for striped file + } Block localBlock = ExtendedBlock.getLocalBlock(b); fsd.writeLock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 65e26df971..59daba4ee0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -120,6 +120,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.ToolRunner; import com.google.common.base.Joiner; @@ -1867,6 +1868,7 @@ public void shutdown(boolean deleteDfsDir, boolean closeFileSystem) { nameNode = null; } } + ShutdownHookManager.get().clearShutdownHooks(); if (base_dir != null) { if (deleteDfsDir) { base_dir.delete(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java index 5cab978a13..35e7e6d3b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.erasurecode.CodecUtil; -import org.apache.hadoop.io.erasurecode.rawcoder.RSRawEncoder; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; @@ -169,6 +168,7 @@ private byte getByte(long pos) { } private void testOneFile(String src, int writeBytes) throws IOException { + src += "_" + writeBytes; Path testPath = new Path(src); byte[] bytes = generateBytes(writeBytes); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java index f65d0c7500..e8e562b53a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -48,6 +49,7 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Test; @@ -71,6 +73,38 @@ public class TestDFSStripedOutputStreamWithFailure { private static final int FLUSH_POS = 9*DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT + 1; + static { + System.out.println("NUM_DATA_BLOCKS = " + NUM_DATA_BLOCKS); + System.out.println("NUM_PARITY_BLOCKS= " + NUM_PARITY_BLOCKS); + System.out.println("CELL_SIZE = " + CELL_SIZE + + " (=" + StringUtils.TraditionalBinaryPrefix.long2String(CELL_SIZE, "B", 2) + ")"); + System.out.println("BLOCK_SIZE = " + BLOCK_SIZE + + " (=" + StringUtils.TraditionalBinaryPrefix.long2String(BLOCK_SIZE, "B", 2) + ")"); + System.out.println("BLOCK_GROUP_SIZE = " + BLOCK_GROUP_SIZE + + " (=" + StringUtils.TraditionalBinaryPrefix.long2String(BLOCK_GROUP_SIZE, "B", 2) + ")"); + } + + static List newLengths() { + final List lengths = new ArrayList<>(); + lengths.add(FLUSH_POS + 2); + for(int b = 0; b <= 2; b++) { + for(int c = 0; c < STRIPES_PER_BLOCK*NUM_DATA_BLOCKS; c++) { + for(int delta = -1; delta <= 1; delta++) { + final int length = b*BLOCK_GROUP_SIZE + c*CELL_SIZE + delta; + System.out.println(lengths.size() + ": length=" + length + + ", (b, c, d) = (" + b + ", " + c + ", " + delta + ")"); + lengths.add(length); + } + } + } + return lengths; + } + + private static final List LENGTHS = newLengths(); + + static int getLength(int i) { + return LENGTHS.get(i); + } private MiniDFSCluster cluster; private DistributedFileSystem dfs; @@ -96,50 +130,49 @@ private static byte getByte(long pos) { return (byte)pos; } - private void initConf(Configuration conf){ + private HdfsConfiguration newHdfsConfiguration() { + final HdfsConfiguration conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + return conf; } - private void initConfWithBlockToken(Configuration conf) { - conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); - conf.setInt("ipc.client.connect.max.retries", 0); - // Set short retry timeouts so this test runs faster - conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); - } - - @Test(timeout=240000) - public void testDatanodeFailure() throws Exception { - final int length = NUM_DATA_BLOCKS*(BLOCK_SIZE - CELL_SIZE); - HdfsConfiguration conf = new HdfsConfiguration(); - initConf(conf); + void runTest(final int length) { + final HdfsConfiguration conf = newHdfsConfiguration(); for (int dn = 0; dn < 9; dn++) { try { setup(conf); - cluster.startDataNodes(conf, 1, true, null, null); - cluster.waitActive(); - runTest(new Path(dir, "file" + dn), length, length / 2, dn, false); + runTest(length, dn, false, conf); } catch (Exception e) { - LOG.error("failed, dn=" + dn + ", length=" + length); - throw e; + final String err = "failed, dn=" + dn + ", length=" + length + + StringUtils.stringifyException(e); + LOG.error(err); + Assert.fail(err); } finally { tearDown(); } } } + @Test(timeout=240000) + public void testDatanodeFailure56() throws Exception { + runTest(getLength(56)); + } + @Test(timeout=240000) public void testBlockTokenExpired() throws Exception { final int length = NUM_DATA_BLOCKS * (BLOCK_SIZE - CELL_SIZE); - HdfsConfiguration conf = new HdfsConfiguration(); - initConf(conf); - initConfWithBlockToken(conf); + final HdfsConfiguration conf = newHdfsConfiguration(); + + conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + conf.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0); + // Set short retry timeouts so this test runs faster + conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10); for (int dn = 0; dn < 9; dn += 2) { try { setup(conf); - cluster.startDataNodes(conf, 1, true, null, null); - cluster.waitActive(); - runTest(new Path(dir, "file" + dn), length, length / 2, dn, true); + runTest(length, dn, true, conf); } catch (Exception e) { LOG.error("failed, dn=" + dn + ", length=" + length); throw e; @@ -229,19 +262,41 @@ public void testAddBlockWhenNoSufficientParityNumOfNodes() throws IOException { } } - private void runTest(final Path p, final int length, final int killPos, - final int dnIndex, final boolean tokenExpire) throws Exception { - LOG.info("p=" + p + ", length=" + length + ", killPos=" + killPos - + ", dnIndex=" + dnIndex); - Preconditions.checkArgument(killPos < length); - Preconditions.checkArgument(killPos > FLUSH_POS); - final String fullPath = p.toString(); + private void runTest(final int length, final int dnIndex, + final boolean tokenExpire, final HdfsConfiguration conf) { + try { + runTest(length, length/2, dnIndex, tokenExpire, conf); + } catch(Exception e) { + LOG.info("FAILED", e); + Assert.fail(StringUtils.stringifyException(e)); + } + } + + private void runTest(final int length, final int killPos, + final int dnIndex, final boolean tokenExpire, + final HdfsConfiguration conf) throws Exception { + if (killPos <= FLUSH_POS) { + LOG.warn("killPos=" + killPos + " <= FLUSH_POS=" + FLUSH_POS + + ", length=" + length + ", dnIndex=" + dnIndex); + return; //skip test + } + Preconditions.checkArgument(length > killPos, + "length=%s <= killPos=%s", length, killPos); + + // start a datanode now, will kill one later + cluster.startDataNodes(conf, 1, true, null, null); + cluster.waitActive(); + + final Path p = new Path(dir, "dn" + dnIndex + "len" + length + "kill" + killPos); + final String fullPath = p.toString(); + LOG.info("fullPath=" + fullPath); - final NameNode nn = cluster.getNameNode(); - final BlockManager bm = nn.getNamesystem().getBlockManager(); - final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager(); if (tokenExpire) { + final NameNode nn = cluster.getNameNode(); + final BlockManager bm = nn.getNamesystem().getBlockManager(); + final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager(); + // set a short token lifetime (1 second) SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); } @@ -265,7 +320,7 @@ private void runTest(final Path p, final int length, final int killPos, waitTokenExpires(out); } - StripedFileTestUtil.killDatanode(cluster, stripedOut, dnIndex, pos); + killDatanode(cluster, stripedOut, dnIndex, pos); killed = true; } @@ -301,6 +356,40 @@ static long getGenerationStamp(DFSStripedOutputStream out) } + static DatanodeInfo getDatanodes(StripedDataStreamer streamer) { + for(;;) { + DatanodeInfo[] datanodes = streamer.getNodes(); + if (datanodes == null) { + // try peeking following block. + final LocatedBlock lb = streamer.peekFollowingBlock(); + if (lb != null) { + datanodes = lb.getLocations(); + } + } + + if (datanodes != null) { + Assert.assertEquals(1, datanodes.length); + Assert.assertNotNull(datanodes[0]); + return datanodes[0]; + } + + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + Assert.fail(StringUtils.stringifyException(ie)); + return null; + } + } + } + + static void killDatanode(MiniDFSCluster cluster, DFSStripedOutputStream out, + final int dnIndex, final AtomicInteger pos) { + final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex); + final DatanodeInfo datanode = getDatanodes(s); + LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos); + cluster.stopDataNode(datanode.getXferAddr()); + } + static void checkData(DistributedFileSystem dfs, String src, int length, int killedDnIndex, long oldGS) throws IOException { List> blockGroupList = new ArrayList<>(); @@ -314,7 +403,7 @@ static void checkData(DistributedFileSystem dfs, String src, int length, final long gs = firstBlock.getBlock().getGenerationStamp(); final String s = "gs=" + gs + ", oldGS=" + oldGS; LOG.info(s); - Assert.assertTrue(s, gs > oldGS); + Assert.assertTrue(s, gs >= oldGS); LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( (LocatedStripedBlock) firstBlock, @@ -342,7 +431,7 @@ static void checkData(DistributedFileSystem dfs, String src, int length, final int numCellInBlock = (numCellInGroup - 1)/NUM_DATA_BLOCKS + (j <= lastCellIndex? 1: 0); final int blockSize = numCellInBlock*CELL_SIZE - + (isLastGroup && i == lastCellIndex? lastCellSize - CELL_SIZE: 0); + + (isLastGroup && j == lastCellIndex? lastCellSize - CELL_SIZE: 0); final byte[] blockBytes = new byte[blockSize]; if (i < NUM_DATA_BLOCKS) { @@ -352,7 +441,8 @@ static void checkData(DistributedFileSystem dfs, String src, int length, } final LocatedBlock lb = blockList.get(i); - LOG.info("XXX i=" + i + ", lb=" + lb); + LOG.info("i,j=" + i + ", " + j + ", numCellInBlock=" + numCellInBlock + + ", blockSize=" + blockSize + ", lb=" + lb); if (lb == null) { continue; } @@ -410,4 +500,35 @@ private void waitTokenExpires(FSDataOutputStream out) throws IOException { } } } + + public static abstract class TestBase { + static final long TIMEOUT = 240000; + + int getBase() { + final String name = getClass().getSimpleName(); + int i = name.length() - 1; + for(; i >= 0 && Character.isDigit(name.charAt(i)); i--); + return Integer.parseInt(name.substring(i + 1)); + } + + private final TestDFSStripedOutputStreamWithFailure test + = new TestDFSStripedOutputStreamWithFailure(); + private void run(int offset) { + final int i = offset + getBase(); + final int length = getLength(i); + System.out.println("Run test " + i + ", length=" + length); + test.runTest(length); + } + + @Test(timeout=TIMEOUT) public void test0() {run(0);} + @Test(timeout=TIMEOUT) public void test1() {run(1);} + @Test(timeout=TIMEOUT) public void test2() {run(2);} + @Test(timeout=TIMEOUT) public void test3() {run(3);} + @Test(timeout=TIMEOUT) public void test4() {run(4);} + @Test(timeout=TIMEOUT) public void test5() {run(5);} + @Test(timeout=TIMEOUT) public void test6() {run(6);} + @Test(timeout=TIMEOUT) public void test7() {run(7);} + @Test(timeout=TIMEOUT) public void test8() {run(8);} + @Test(timeout=TIMEOUT) public void test9() {run(9);} + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure000.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure000.java new file mode 100644 index 0000000000..b4fb1b89d2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure000.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.hdfs.TestDFSStripedOutputStreamWithFailure.TestBase; + +public class TestDFSStripedOutputStreamWithFailure000 extends TestBase {} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure010.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure010.java new file mode 100644 index 0000000000..8489c3d629 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure010.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.hdfs.TestDFSStripedOutputStreamWithFailure.TestBase; + +public class TestDFSStripedOutputStreamWithFailure010 extends TestBase {} \ No newline at end of file