From b57c9a35f76612a02e16497ea92fbc3f4dc524ec Mon Sep 17 00:00:00 2001 From: Zhe Zhang Date: Fri, 14 Aug 2015 15:16:22 -0700 Subject: [PATCH] HDFS-8220. Erasure Coding: StripedDataStreamer fails to handle the blocklocations which doesn't satisfy BlockGroupSize. --- .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 3 + .../hadoop/hdfs/StripedDataStreamer.java | 42 ++++++++-- ...TestDFSStripedOutputStreamWithFailure.java | 84 +++++++++++++++++++ 3 files changed, 120 insertions(+), 9 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index b752d5bdeb..dad997a06b 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -397,3 +397,6 @@ HDFS-8854. Erasure coding: add ECPolicy to replace schema+cellSize in hadoop-hdfs. (Walter Su via zhz) + + HDFS-8220. Erasure Coding: StripedDataStreamer fails to handle the + blocklocations which doesn't satisfy BlockGroupSize. (Rakesh R via zhz) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index 2d51dc4358..f533bf9b10 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.DFSStripedOutputStream.MultipleBlockingQueue; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; @@ -167,18 +168,33 @@ void populate() throws IOException { final LocatedBlock lb = StripedDataStreamer.super.locateFollowingBlock( excludedNodes); + if (lb.getLocations().length < HdfsConstants.NUM_DATA_BLOCKS) { + throw new IOException( + "Failed to get datablocks number of nodes from namenode: blockGroupSize= " + + (HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS) + + ", blocks.length= " + lb.getLocations().length); + } final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup( (LocatedStripedBlock)lb, BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); for (int i = 0; i < blocks.length; i++) { - if (!coordinator.getStripedDataStreamer(i).isFailed()) { - if (blocks[i] == null) { - getLastException().set( - new IOException("Failed to get following block, i=" + i)); - } else { - followingBlocks.offer(i, blocks[i]); - } + StripedDataStreamer si = coordinator.getStripedDataStreamer(i); + if (si.isFailed()) { + continue; // skipping failed data streamer + } + if (blocks[i] == null) { + // Set exception and close streamer as there is no block locations + // found for the parity block. + LOG.warn("Failed to get block location for parity block, index=" + + i); + si.getLastException().set( + new IOException("Failed to get following block, i=" + i)); + si.setFailed(true); + si.endBlock(); + si.close(true); + } else { + followingBlocks.offer(i, blocks[i]); } } } @@ -199,7 +215,11 @@ void populate() throws IOException { .parseStripedBlockGroup((LocatedStripedBlock) updated, BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS); for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { - final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock(); + StripedDataStreamer si = coordinator.getStripedDataStreamer(i); + if (si.isFailed()) { + continue; // skipping failed data streamer + } + final ExtendedBlock bi = si.getBlock(); if (bi != null) { final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS), null, null, null, -1, updated.isCorrupt(), null); @@ -225,7 +245,11 @@ void populate() throws IOException { final ExtendedBlock newBG = newBlock(bg, newGS); final ExtendedBlock updated = callUpdatePipeline(bg, newBG); for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) { - final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock(); + StripedDataStreamer si = coordinator.getStripedDataStreamer(i); + if (si.isFailed()) { + continue; // skipping failed data streamer + } + final ExtendedBlock bi = si.getBlock(); updateBlocks.offer(i, newBlock(bi, updated.getGenerationStamp())); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java index fed9f16764..f65d0c7500 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs; +import static org.junit.Assert.assertEquals; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -33,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; @@ -40,6 +43,7 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.security.token.Token; @@ -145,6 +149,86 @@ public void testBlockTokenExpired() throws Exception { } } + @Test(timeout = 90000) + public void testAddBlockWhenNoSufficientDataBlockNumOfNodes() + throws IOException { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + try { + setup(conf); + ArrayList dataNodes = cluster.getDataNodes(); + // shutdown few datanodes to avoid getting sufficient data blocks number + // of datanodes + int killDns = dataNodes.size() / 2; + int numDatanodes = dataNodes.size() - killDns; + for (int i = 0; i < killDns; i++) { + cluster.stopDataNode(i); + } + cluster.restartNameNodes(); + cluster.triggerHeartbeats(); + DatanodeInfo[] info = dfs.getClient().datanodeReport( + DatanodeReportType.LIVE); + assertEquals("Mismatches number of live Dns ", numDatanodes, info.length); + final Path dirFile = new Path(dir, "ecfile"); + FSDataOutputStream out = null; + try { + out = dfs.create(dirFile, true); + out.write("something".getBytes()); + out.flush(); + out.close(); + Assert.fail("Failed to validate available dns against blkGroupSize"); + } catch (IOException ioe) { + // expected + GenericTestUtils.assertExceptionContains("Failed: the number of " + + "remaining blocks = 5 < the number of data blocks = 6", ioe); + DFSStripedOutputStream dfsout = (DFSStripedOutputStream) out + .getWrappedStream(); + + // get leading streamer and verify the last exception + StripedDataStreamer datastreamer = dfsout.getStripedDataStreamer(0); + try { + datastreamer.getLastException().check(true); + Assert.fail("Failed to validate available dns against blkGroupSize"); + } catch (IOException le) { + GenericTestUtils.assertExceptionContains( + "Failed to get datablocks number of nodes from" + + " namenode: blockGroupSize= 9, blocks.length= " + + numDatanodes, le); + } + } + } finally { + tearDown(); + } + } + + @Test(timeout = 90000) + public void testAddBlockWhenNoSufficientParityNumOfNodes() throws IOException { + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + try { + setup(conf); + ArrayList dataNodes = cluster.getDataNodes(); + // shutdown few data nodes to avoid writing parity blocks + int killDns = (NUM_PARITY_BLOCKS - 1); + int numDatanodes = dataNodes.size() - killDns; + for (int i = 0; i < killDns; i++) { + cluster.stopDataNode(i); + } + cluster.restartNameNodes(); + cluster.triggerHeartbeats(); + DatanodeInfo[] info = dfs.getClient().datanodeReport( + DatanodeReportType.LIVE); + assertEquals("Mismatches number of live Dns ", numDatanodes, info.length); + Path srcPath = new Path(dir, "testAddBlockWhenNoSufficientParityNodes"); + int fileLength = HdfsConstants.BLOCK_STRIPED_CELL_SIZE - 1000; + final byte[] expected = StripedFileTestUtil.generateBytes(fileLength); + DFSTestUtil.writeFile(dfs, srcPath, new String(expected)); + StripedFileTestUtil.verifySeek(dfs, srcPath, fileLength); + } finally { + tearDown(); + } + } + private void runTest(final Path p, final int length, final int killPos, final int dnIndex, final boolean tokenExpire) throws Exception { LOG.info("p=" + p + ", length=" + length + ", killPos=" + killPos