From a8b4d0ff283a0af1075aaa94904d4c6e63a9a3dd Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Tue, 6 Oct 2015 10:56:10 -0700 Subject: [PATCH] HDFS-9180. Update excluded DataNodes in DFSStripedOutputStream based on failures in data streamers. Contributed by Jing Zhao. --- .../hadoop/hdfs/DFSStripedOutputStream.java | 107 ++++++++++++------ .../org/apache/hadoop/hdfs/DataStreamer.java | 23 ++-- .../hadoop/hdfs/StripedDataStreamer.java | 2 +- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../hadoop/hdfs/StripedFileTestUtil.java | 58 ++++++---- .../hdfs/TestDFSStripedOutputStream.java | 6 +- ...TestDFSStripedOutputStreamWithFailure.java | 72 ++++++++---- .../hdfs/TestReadStripedFileWithDecoding.java | 1 + .../hadoop/hdfs/TestWriteReadStripedFile.java | 4 + 9 files changed, 189 insertions(+), 87 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 88e4b8f4aa..a8acf25330 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -377,17 +377,21 @@ public class DFSStripedOutputStream extends DFSOutputStream { private void replaceFailedStreamers() { assert streamers.size() == numAllBlocks; + final int currentIndex = getCurrentIndex(); + assert currentIndex == 0; for (short i = 0; i < numAllBlocks; i++) { final StripedDataStreamer oldStreamer = getStripedDataStreamer(i); if (!oldStreamer.isHealthy()) { + LOG.info("replacing previously failed streamer " + oldStreamer); StripedDataStreamer streamer = new StripedDataStreamer(oldStreamer.stat, dfsClient, src, oldStreamer.progress, oldStreamer.checksum4WriteBlock, cachingStrategy, byteArrayManager, favoredNodes, i, coordinator); streamers.set(i, streamer); currentPackets[i] = null; - if (i == 0) { + if (i == currentIndex) { this.streamer = streamer; + this.currentPacket = null; } streamer.start(); } @@ -404,6 +408,18 @@ public class DFSStripedOutputStream extends DFSOutputStream { } } + private DatanodeInfo[] getExcludedNodes() { + List excluded = new ArrayList<>(); + for (StripedDataStreamer streamer : streamers) { + for (DatanodeInfo e : streamer.getExcludedNodes()) { + if (e != null) { + excluded.add(e); + } + } + } + return excluded.toArray(new DatanodeInfo[excluded.size()]); + } + private void allocateNewBlock() throws IOException { if (currentBlockGroup != null) { for (int i = 0; i < numAllBlocks; i++) { @@ -412,17 +428,17 @@ public class DFSStripedOutputStream extends DFSOutputStream { } } failedStreamers.clear(); + DatanodeInfo[] excludedNodes = getExcludedNodes(); + LOG.debug("Excluding DataNodes when allocating new block: " + + Arrays.asList(excludedNodes)); + // replace failed streamers replaceFailedStreamers(); - if (LOG.isDebugEnabled()) { - LOG.debug("Allocating new block group. The previous block group: " - + currentBlockGroup); - } - - // TODO collect excludedNodes from all the data streamers - final LocatedBlock lb = addBlock(null, dfsClient, src, currentBlockGroup, - fileId, favoredNodes); + LOG.debug("Allocating new block group. The previous block group: " + + currentBlockGroup); + final LocatedBlock lb = addBlock(excludedNodes, dfsClient, src, + currentBlockGroup, fileId, favoredNodes); assert lb.isStriped(); if (lb.getLocations().length < numDataBlocks) { throw new IOException("Failed to get " + numDataBlocks @@ -437,18 +453,17 @@ public class DFSStripedOutputStream extends DFSOutputStream { numAllBlocks - numDataBlocks); for (int i = 0; i < blocks.length; i++) { StripedDataStreamer si = getStripedDataStreamer(i); - if (si.isHealthy()) { // skipping failed data streamer - if (blocks[i] == null) { - // Set exception and close streamer as there is no block locations - // found for the parity block. - LOG.warn("Failed to get block location for parity block, index=" + i); - si.getLastException().set( - new IOException("Failed to get following block, i=" + i)); - si.getErrorState().setInternalError(); - si.close(true); - } else { - coordinator.getFollowingBlocks().offer(i, blocks[i]); - } + assert si.isHealthy(); + if (blocks[i] == null) { + // Set exception and close streamer as there is no block locations + // found for the parity block. + LOG.warn("Failed to get block location for parity block, index=" + i); + si.getLastException().set( + new IOException("Failed to get following block, i=" + i)); + si.getErrorState().setInternalError(); + si.close(true); + } else { + coordinator.getFollowingBlocks().offer(i, blocks[i]); } } } @@ -462,7 +477,6 @@ public class DFSStripedOutputStream extends DFSOutputStream { protected synchronized void writeChunk(byte[] bytes, int offset, int len, byte[] checksum, int ckoff, int cklen) throws IOException { final int index = getCurrentIndex(); - final StripedDataStreamer current = getCurrentStreamer(); final int pos = cellBuffers.addTo(index, bytes, offset, len); final boolean cellFull = pos == cellSize; @@ -472,6 +486,8 @@ public class DFSStripedOutputStream extends DFSOutputStream { } currentBlockGroup.setNumBytes(currentBlockGroup.getNumBytes() + len); + // note: the current streamer can be refreshed after allocating a new block + final StripedDataStreamer current = getCurrentStreamer(); if (current.isHealthy()) { try { super.writeChunk(bytes, offset, len, checksum, ckoff, cklen); @@ -492,11 +508,11 @@ public class DFSStripedOutputStream extends DFSOutputStream { cellBuffers.flipDataBuffers(); writeParityCells(); next = 0; - // check failure state for all the streamers. Bump GS if necessary - checkStreamerFailures(); // if this is the end of the block group, end each internal block if (shouldEndBlockGroup()) { + flushAllInternals(); + checkStreamerFailures(); for (int i = 0; i < numAllBlocks; i++) { final StripedDataStreamer s = setCurrentStreamer(i); if (s.isHealthy()) { @@ -505,6 +521,9 @@ public class DFSStripedOutputStream extends DFSOutputStream { } catch (IOException ignored) {} } } + } else { + // check failure state for all the streamers. Bump GS if necessary + checkStreamerFailures(); } } setCurrentStreamer(next); @@ -522,11 +541,32 @@ public class DFSStripedOutputStream extends DFSOutputStream { // no need to end block here } + /** + * @return whether the data streamer with the given index is streaming data. + * Note the streamer may not be in STREAMING stage if the block length is less + * than a stripe. + */ + private boolean isStreamerWriting(int streamerIndex) { + final long length = currentBlockGroup == null ? + 0 : currentBlockGroup.getNumBytes(); + if (length == 0) { + return false; + } + if (streamerIndex >= numDataBlocks) { + return true; + } + final int numCells = (int) ((length - 1) / cellSize + 1); + return streamerIndex < numCells; + } + private Set markExternalErrorOnStreamers() { Set healthySet = new HashSet<>(); - for (StripedDataStreamer streamer : streamers) { - if (streamer.isHealthy() && - streamer.getStage() == BlockConstructionStage.DATA_STREAMING) { + for (int i = 0; i < numAllBlocks; i++) { + final StripedDataStreamer streamer = getStripedDataStreamer(i); + if (streamer.isHealthy() && isStreamerWriting(i)) { + Preconditions.checkState( + streamer.getStage() == BlockConstructionStage.DATA_STREAMING, + "streamer: " + streamer); streamer.setExternalError(); healthySet.add(streamer); } @@ -541,12 +581,14 @@ public class DFSStripedOutputStream extends DFSOutputStream { */ private void checkStreamerFailures() throws IOException { Set newFailed = checkStreamers(); - if (newFailed.size() > 0) { - // for healthy streamers, wait till all of them have fetched the new block - // and flushed out all the enqueued packets. - flushAllInternals(); + if (newFailed.size() == 0) { + return; } - // get all the current failed streamers after the flush + + // for healthy streamers, wait till all of them have fetched the new block + // and flushed out all the enqueued packets. + flushAllInternals(); + // recheck failed streamers again after the flush newFailed = checkStreamers(); while (newFailed.size() > 0) { failedStreamers.addAll(newFailed); @@ -629,6 +671,7 @@ public class DFSStripedOutputStream extends DFSOutputStream { for (StripedDataStreamer streamer : healthyStreamers) { if (!coordinator.updateStreamerMap.containsKey(streamer)) { // close the streamer if it is too slow to create new connection + LOG.info("close the slow stream " + streamer); streamer.setStreamerAsClosed(); failed.add(streamer); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 4260a9eb59..b0c5be6f55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -359,7 +359,7 @@ class DataStreamer extends Daemon { private volatile String[] storageIDs = null; private final ErrorState errorState; - private BlockConstructionStage stage; // block construction stage + private volatile BlockConstructionStage stage; // block construction stage protected long bytesSent = 0; // number of bytes that've been sent private final boolean isLazyPersistFile; @@ -588,7 +588,7 @@ class DataStreamer extends Daemon { LOG.debug("stage=" + stage + ", " + this); } if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { - LOG.debug("Allocating new block"); + LOG.debug("Allocating new block: " + this); setPipeline(nextBlockOutputStream()); initDataStreaming(); } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { @@ -748,7 +748,7 @@ class DataStreamer extends Daemon { void waitForAckedSeqno(long seqno) throws IOException { try (TraceScope ignored = dfsClient.getTracer(). newScope("waitForAckedSeqno")) { - LOG.debug("Waiting for ack for: {}", seqno); + LOG.debug("{} waiting for ack for: {}", this, seqno); long begin = Time.monotonicNow(); try { synchronized (dataQueue) { @@ -1085,6 +1085,7 @@ class DataStreamer extends Daemon { if (!errorState.hasDatanodeError() && !shouldHandleExternalError()) { return false; } + LOG.debug("start process datanode/external error, {}", this); if (response != null) { LOG.info("Error Recovery for " + block + " waiting for responder to exit. "); @@ -1307,10 +1308,12 @@ class DataStreamer extends Daemon { * It keeps on trying until a pipeline is setup */ private void setupPipelineForAppendOrRecovery() throws IOException { - // check number of datanodes + // Check number of datanodes. Note that if there is no healthy datanode, + // this must be internal error because we mark external error in striped + // outputstream only when all the streamers are in the DATA_STREAMING stage if (nodes == null || nodes.length == 0) { String msg = "Could not get block locations. " + "Source file \"" - + src + "\" - Aborting..."; + + src + "\" - Aborting..." + this; LOG.warn(msg); lastException.set(new IOException(msg)); streamerClosed = true; @@ -1462,8 +1465,9 @@ class DataStreamer extends Daemon { return newBlock; } - private int getNumBlockWriteRetry() { - return dfsClient.getConf().getNumBlockWriteRetry(); + DatanodeInfo[] getExcludedNodes() { + return excludedNodes.getAllPresent(excludedNodes.asMap().keySet()) + .keySet().toArray(new DatanodeInfo[0]); } /** @@ -1483,10 +1487,7 @@ class DataStreamer extends Daemon { errorState.resetInternalError(); lastException.clear(); - DatanodeInfo[] excluded = - excludedNodes.getAllPresent(excludedNodes.asMap().keySet()) - .keySet() - .toArray(new DatanodeInfo[0]); + DatanodeInfo[] excluded = getExcludedNodes(); block = oldBlock; lb = locateFollowingBlock(excluded.length > 0 ? excluded : null); block = lb.getBlock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index c46700ad52..f74edc830f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -105,7 +105,7 @@ public class StripedDataStreamer extends DataStreamer { final DatanodeInfo badNode = nodes[getErrorState().getBadNodeIndex()]; LOG.info("Excluding datanode " + badNode); excludedNodes.put(badNode, badNode); - throw new IOException("Unable to create new block."); + throw new IOException("Unable to create new block." + this); } return lb; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index c9f82c0e51..994911271a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -455,3 +455,6 @@ DFSStripedOutputStream. (jing9 and Walter Su) HDFS-9185. Fix null tracer in ErasureCodingWorker. (Rakesh R via jing9) + + HDFS-9180. Update excluded DataNodes in DFSStripedOutputStream based on failures + in data streamers. (jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java index 12453fafb3..cc6e7d37de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java @@ -44,6 +44,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -58,6 +59,7 @@ public class StripedFileTestUtil { public static final short NUM_DATA_BLOCKS = (short) 6; public static final short NUM_PARITY_BLOCKS = (short) 3; public static final int BLOCK_STRIPED_CELL_SIZE = 64 * 1024; + public static final int BLOCK_STRIPE_SIZE = BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS; static final int stripesPerBlock = 4; static final int blockSize = BLOCK_STRIPED_CELL_SIZE * stripesPerBlock; @@ -113,7 +115,9 @@ public class StripedFileTestUtil { offset += target; } for (int i = 0; i < fileLength - startOffset; i++) { - assertEquals("Byte at " + (startOffset + i) + " is different, " + "the startOffset is " + startOffset, expected[startOffset + i], result[i]); + assertEquals("Byte at " + (startOffset + i) + " is different, " + + "the startOffset is " + startOffset, expected[startOffset + i], + result[i]); } } } @@ -251,11 +255,17 @@ public class StripedFileTestUtil { return (short) (getRealDataBlockNum(numBytes) + NUM_PARITY_BLOCKS); } + public static void waitBlockGroupsReported(DistributedFileSystem fs, + String src) throws Exception { + waitBlockGroupsReported(fs, src, 0); + } + /** - * Wait for all the internalBlocks of the blockGroups of the given file to be reported. + * Wait for all the internalBlocks of the blockGroups of the given file to be + * reported. */ - public static void waitBlockGroupsReported(DistributedFileSystem fs, String src) - throws IOException, InterruptedException, TimeoutException { + public static void waitBlockGroupsReported(DistributedFileSystem fs, + String src, int numDeadDNs) throws Exception { boolean success; final int ATTEMPTS = 40; int count = 0; @@ -265,11 +275,12 @@ public class StripedFileTestUtil { count++; LocatedBlocks lbs = fs.getClient().getLocatedBlocks(src, 0); for (LocatedBlock lb : lbs.getLocatedBlocks()) { - short expected = getRealTotalBlockNum((int) lb.getBlockSize()); + short expected = (short) (getRealTotalBlockNum((int) lb.getBlockSize()) + - numDeadDNs); int reported = lb.getLocations().length; - if (reported != expected){ + if (reported < expected){ success = false; - System.out.println("blockGroup " + lb.getBlock() + " of file " + src + LOG.info("blockGroup " + lb.getBlock() + " of file " + src + " has reported internalBlocks " + reported + " (desired " + expected + "); locations " + Joiner.on(' ').join(lb.getLocations())); @@ -278,7 +289,7 @@ public class StripedFileTestUtil { } } if (success) { - System.out.println("All blockGroups of file " + src + LOG.info("All blockGroups of file " + src + " verified to have all internalBlocks."); } } while (!success && count < ATTEMPTS); @@ -348,10 +359,9 @@ public class StripedFileTestUtil { } static void checkData(DistributedFileSystem dfs, Path srcPath, int length, - int[] killedDnIndex, long oldGS) throws IOException { + List killedList, List oldGSList) throws IOException { StripedFileTestUtil.verifyLength(dfs, srcPath, length); - Arrays.sort(killedDnIndex); List> blockGroupList = new ArrayList<>(); LocatedBlocks lbs = dfs.getClient().getLocatedBlocks(srcPath.toString(), 0L, Long.MAX_VALUE); @@ -361,10 +371,12 @@ public class StripedFileTestUtil { } assertEquals(expectedNumGroup, lbs.getLocatedBlocks().size()); + int index = 0; for (LocatedBlock firstBlock : lbs.getLocatedBlocks()) { Assert.assertTrue(firstBlock instanceof LocatedStripedBlock); final long gs = firstBlock.getBlock().getGenerationStamp(); + final long oldGS = oldGSList != null ? oldGSList.get(index++) : -1L; final String s = "gs=" + gs + ", oldGS=" + oldGS; LOG.info(s); Assert.assertTrue(s, gs >= oldGS); @@ -389,6 +401,7 @@ public class StripedFileTestUtil { byte[][] dataBlockBytes = new byte[NUM_DATA_BLOCKS][]; byte[][] parityBlockBytes = new byte[NUM_PARITY_BLOCKS][]; + Set checkSet = new HashSet<>(); // for each block, use BlockReader to read data for (int i = 0; i < blockList.size(); i++) { final int j = i >= NUM_DATA_BLOCKS? 0: i; @@ -417,19 +430,22 @@ public class StripedFileTestUtil { continue; } - if (Arrays.binarySearch(killedDnIndex, i) < 0) { + DatanodeInfo dn = blockList.get(i).getLocations()[0]; + if (!killedList.contains(dn)) { final BlockReader blockReader = BlockReaderTestUtil.getBlockReader( dfs, lb, 0, block.getNumBytes()); blockReader.readAll(blockBytes, 0, (int) block.getNumBytes()); blockReader.close(); + checkSet.add(i); } } + LOG.info("Internal blocks to check: " + checkSet); // check data final int groupPosInFile = group*BLOCK_GROUP_SIZE; for (int i = 0; i < dataBlockBytes.length; i++) { boolean killed = false; - if (Arrays.binarySearch(killedDnIndex, i) >= 0){ + if (!checkSet.contains(i)) { killed = true; } final byte[] actual = dataBlockBytes[i]; @@ -453,15 +469,15 @@ public class StripedFileTestUtil { } // check parity - verifyParityBlocks(dfs.getConf(), lbs.getLocatedBlocks().get(group) - .getBlockSize(), - BLOCK_STRIPED_CELL_SIZE, dataBlockBytes, parityBlockBytes, killedDnIndex); + verifyParityBlocks(dfs.getConf(), + lbs.getLocatedBlocks().get(group).getBlockSize(), + BLOCK_STRIPED_CELL_SIZE, dataBlockBytes, parityBlockBytes, checkSet); } } - static void verifyParityBlocks(Configuration conf, final long size, final int cellSize, - byte[][] dataBytes, byte[][] parityBytes, int[] killedDnIndex) { - Arrays.sort(killedDnIndex); + static void verifyParityBlocks(Configuration conf, final long size, + final int cellSize, byte[][] dataBytes, byte[][] parityBytes, + Set checkSet) { // verify the parity blocks int parityBlkSize = (int) StripedBlockUtil.getInternalBlockLength( size, cellSize, dataBytes.length, dataBytes.length); @@ -482,9 +498,9 @@ public class StripedFileTestUtil { CodecUtil.createRSRawEncoder(conf, dataBytes.length, parityBytes.length); encoder.encode(dataBytes, expectedParityBytes); for (int i = 0; i < parityBytes.length; i++) { - if (Arrays.binarySearch(killedDnIndex, dataBytes.length + i) < 0){ - Assert.assertArrayEquals("i=" + i + ", killedDnIndex=" + Arrays.toString(killedDnIndex), - expectedParityBytes[i], parityBytes[i]); + if (checkSet.contains(i + dataBytes.length)){ + Assert.assertArrayEquals("i=" + i, expectedParityBytes[i], + parityBytes[i]); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java index d78e88b197..f1ce8ff0ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStream.java @@ -18,11 +18,15 @@ package org.apache.hadoop.hdfs; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; import org.junit.After; @@ -151,6 +155,6 @@ public class TestDFSStripedOutputStream { StripedFileTestUtil.waitBlockGroupsReported(fs, src); StripedFileTestUtil.checkData(fs, testPath, writeBytes, - new int[]{}, 0); + new ArrayList(), null); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java index f6c25661df..7bd976f3fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -24,23 +24,25 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.security.token.Token; @@ -59,6 +61,9 @@ public class TestDFSStripedOutputStreamWithFailure { static { GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.ALL); GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.ALL); + GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL); + ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class)) + .getLogger().setLevel(Level.ALL); } private static final int NUM_DATA_BLOCKS = StripedFileTestUtil.NUM_DATA_BLOCKS; @@ -134,6 +139,7 @@ public class TestDFSStripedOutputStreamWithFailure { private DistributedFileSystem dfs; private final Path dir = new Path("/" + TestDFSStripedOutputStreamWithFailure.class.getSimpleName()); + private final Random random = new Random(); private void setup(Configuration conf) throws IOException { final int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; @@ -153,7 +159,8 @@ public class TestDFSStripedOutputStreamWithFailure { private HdfsConfiguration newHdfsConfiguration() { final HdfsConfiguration conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); - conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 6000L); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, + false); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); return conf; @@ -164,11 +171,31 @@ public class TestDFSStripedOutputStreamWithFailure { runTest(getLength(56)); } + @Test(timeout=240000) + public void testDatanodeFailureRandomLength() throws Exception { + int lenIndex = random.nextInt(LENGTHS.size()); + LOG.info("run testMultipleDatanodeFailureRandomLength with length index: " + + lenIndex); + runTest(getLength(lenIndex)); + } + @Test(timeout=240000) public void testMultipleDatanodeFailure56() throws Exception { runTestWithMultipleFailure(getLength(56)); } + /** + * Randomly pick a length and run tests with multiple data failures + * TODO: enable this later + */ + //@Test(timeout=240000) + public void testMultipleDatanodeFailureRandomLength() throws Exception { + int lenIndex = random.nextInt(LENGTHS.size()); + LOG.info("run testMultipleDatanodeFailureRandomLength with length index: " + + lenIndex); + runTestWithMultipleFailure(getLength(lenIndex)); + } + @Test(timeout=240000) public void testBlockTokenExpired() throws Exception { final int length = NUM_DATA_BLOCKS * (BLOCK_SIZE - CELL_SIZE); @@ -208,11 +235,10 @@ public class TestDFSStripedOutputStreamWithFailure { } cluster.restartNameNodes(); cluster.triggerHeartbeats(); - DatanodeInfo[] info = dfs.getClient().datanodeReport( - DatanodeReportType.LIVE); + DatanodeInfo[] info = dfs.getClient().datanodeReport(DatanodeReportType.LIVE); assertEquals("Mismatches number of live Dns ", numDatanodes, info.length); final Path dirFile = new Path(dir, "ecfile"); - FSDataOutputStream out = null; + FSDataOutputStream out; try { out = dfs.create(dirFile, true); out.write("something".getBytes()); @@ -262,6 +288,7 @@ public class TestDFSStripedOutputStreamWithFailure { final HdfsConfiguration conf = newHdfsConfiguration(); for (int dn = 0; dn < 9; dn++) { try { + LOG.info("runTest: dn=" + dn + ", length=" + length); setup(conf); runTest(length, new int[]{length/2}, new int[]{dn}, false); } catch (Throwable e) { @@ -277,10 +304,11 @@ public class TestDFSStripedOutputStreamWithFailure { void runTestWithMultipleFailure(final int length) throws Exception { final HdfsConfiguration conf = newHdfsConfiguration(); - for(int i=0;i