HDFS-8220. Erasure Coding: StripedDataStreamer fails to handle the blocklocations which doesn't satisfy BlockGroupSize.
This commit is contained in:
parent
1d37a88121
commit
b57c9a35f7
@ -397,3 +397,6 @@
|
|||||||
|
|
||||||
HDFS-8854. Erasure coding: add ECPolicy to replace schema+cellSize in
|
HDFS-8854. Erasure coding: add ECPolicy to replace schema+cellSize in
|
||||||
hadoop-hdfs. (Walter Su via zhz)
|
hadoop-hdfs. (Walter Su via zhz)
|
||||||
|
|
||||||
|
HDFS-8220. Erasure Coding: StripedDataStreamer fails to handle the
|
||||||
|
blocklocations which doesn't satisfy BlockGroupSize. (Rakesh R via zhz)
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
import org.apache.hadoop.hdfs.DFSStripedOutputStream.MultipleBlockingQueue;
|
import org.apache.hadoop.hdfs.DFSStripedOutputStream.MultipleBlockingQueue;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||||
@ -167,18 +168,33 @@ void populate() throws IOException {
|
|||||||
|
|
||||||
final LocatedBlock lb = StripedDataStreamer.super.locateFollowingBlock(
|
final LocatedBlock lb = StripedDataStreamer.super.locateFollowingBlock(
|
||||||
excludedNodes);
|
excludedNodes);
|
||||||
|
if (lb.getLocations().length < HdfsConstants.NUM_DATA_BLOCKS) {
|
||||||
|
throw new IOException(
|
||||||
|
"Failed to get datablocks number of nodes from namenode: blockGroupSize= "
|
||||||
|
+ (HdfsConstants.NUM_DATA_BLOCKS + HdfsConstants.NUM_PARITY_BLOCKS)
|
||||||
|
+ ", blocks.length= " + lb.getLocations().length);
|
||||||
|
}
|
||||||
final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
|
final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
|
||||||
(LocatedStripedBlock)lb,
|
(LocatedStripedBlock)lb,
|
||||||
BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
|
BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
|
||||||
|
|
||||||
for (int i = 0; i < blocks.length; i++) {
|
for (int i = 0; i < blocks.length; i++) {
|
||||||
if (!coordinator.getStripedDataStreamer(i).isFailed()) {
|
StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
|
||||||
if (blocks[i] == null) {
|
if (si.isFailed()) {
|
||||||
getLastException().set(
|
continue; // skipping failed data streamer
|
||||||
new IOException("Failed to get following block, i=" + i));
|
}
|
||||||
} else {
|
if (blocks[i] == null) {
|
||||||
followingBlocks.offer(i, blocks[i]);
|
// Set exception and close streamer as there is no block locations
|
||||||
}
|
// found for the parity block.
|
||||||
|
LOG.warn("Failed to get block location for parity block, index="
|
||||||
|
+ i);
|
||||||
|
si.getLastException().set(
|
||||||
|
new IOException("Failed to get following block, i=" + i));
|
||||||
|
si.setFailed(true);
|
||||||
|
si.endBlock();
|
||||||
|
si.close(true);
|
||||||
|
} else {
|
||||||
|
followingBlocks.offer(i, blocks[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -199,7 +215,11 @@ void populate() throws IOException {
|
|||||||
.parseStripedBlockGroup((LocatedStripedBlock) updated,
|
.parseStripedBlockGroup((LocatedStripedBlock) updated,
|
||||||
BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
|
BLOCK_STRIPED_CELL_SIZE, NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS);
|
||||||
for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
|
for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
|
||||||
final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock();
|
StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
|
||||||
|
if (si.isFailed()) {
|
||||||
|
continue; // skipping failed data streamer
|
||||||
|
}
|
||||||
|
final ExtendedBlock bi = si.getBlock();
|
||||||
if (bi != null) {
|
if (bi != null) {
|
||||||
final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS),
|
final LocatedBlock lb = new LocatedBlock(newBlock(bi, newGS),
|
||||||
null, null, null, -1, updated.isCorrupt(), null);
|
null, null, null, -1, updated.isCorrupt(), null);
|
||||||
@ -225,7 +245,11 @@ void populate() throws IOException {
|
|||||||
final ExtendedBlock newBG = newBlock(bg, newGS);
|
final ExtendedBlock newBG = newBlock(bg, newGS);
|
||||||
final ExtendedBlock updated = callUpdatePipeline(bg, newBG);
|
final ExtendedBlock updated = callUpdatePipeline(bg, newBG);
|
||||||
for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
|
for (int i = 0; i < NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; i++) {
|
||||||
final ExtendedBlock bi = coordinator.getStripedDataStreamer(i).getBlock();
|
StripedDataStreamer si = coordinator.getStripedDataStreamer(i);
|
||||||
|
if (si.isFailed()) {
|
||||||
|
continue; // skipping failed data streamer
|
||||||
|
}
|
||||||
|
final ExtendedBlock bi = si.getBlock();
|
||||||
updateBlocks.offer(i, newBlock(bi, updated.getGenerationStamp()));
|
updateBlocks.offer(i, newBlock(bi, updated.getGenerationStamp()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@ -33,6 +35,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||||
@ -40,6 +43,7 @@
|
|||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
|
import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
@ -145,6 +149,86 @@ public void testBlockTokenExpired() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 90000)
|
||||||
|
public void testAddBlockWhenNoSufficientDataBlockNumOfNodes()
|
||||||
|
throws IOException {
|
||||||
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||||
|
try {
|
||||||
|
setup(conf);
|
||||||
|
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
|
||||||
|
// shutdown few datanodes to avoid getting sufficient data blocks number
|
||||||
|
// of datanodes
|
||||||
|
int killDns = dataNodes.size() / 2;
|
||||||
|
int numDatanodes = dataNodes.size() - killDns;
|
||||||
|
for (int i = 0; i < killDns; i++) {
|
||||||
|
cluster.stopDataNode(i);
|
||||||
|
}
|
||||||
|
cluster.restartNameNodes();
|
||||||
|
cluster.triggerHeartbeats();
|
||||||
|
DatanodeInfo[] info = dfs.getClient().datanodeReport(
|
||||||
|
DatanodeReportType.LIVE);
|
||||||
|
assertEquals("Mismatches number of live Dns ", numDatanodes, info.length);
|
||||||
|
final Path dirFile = new Path(dir, "ecfile");
|
||||||
|
FSDataOutputStream out = null;
|
||||||
|
try {
|
||||||
|
out = dfs.create(dirFile, true);
|
||||||
|
out.write("something".getBytes());
|
||||||
|
out.flush();
|
||||||
|
out.close();
|
||||||
|
Assert.fail("Failed to validate available dns against blkGroupSize");
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
// expected
|
||||||
|
GenericTestUtils.assertExceptionContains("Failed: the number of "
|
||||||
|
+ "remaining blocks = 5 < the number of data blocks = 6", ioe);
|
||||||
|
DFSStripedOutputStream dfsout = (DFSStripedOutputStream) out
|
||||||
|
.getWrappedStream();
|
||||||
|
|
||||||
|
// get leading streamer and verify the last exception
|
||||||
|
StripedDataStreamer datastreamer = dfsout.getStripedDataStreamer(0);
|
||||||
|
try {
|
||||||
|
datastreamer.getLastException().check(true);
|
||||||
|
Assert.fail("Failed to validate available dns against blkGroupSize");
|
||||||
|
} catch (IOException le) {
|
||||||
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
"Failed to get datablocks number of nodes from"
|
||||||
|
+ " namenode: blockGroupSize= 9, blocks.length= "
|
||||||
|
+ numDatanodes, le);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
tearDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 90000)
|
||||||
|
public void testAddBlockWhenNoSufficientParityNumOfNodes() throws IOException {
|
||||||
|
HdfsConfiguration conf = new HdfsConfiguration();
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
|
||||||
|
try {
|
||||||
|
setup(conf);
|
||||||
|
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
|
||||||
|
// shutdown few data nodes to avoid writing parity blocks
|
||||||
|
int killDns = (NUM_PARITY_BLOCKS - 1);
|
||||||
|
int numDatanodes = dataNodes.size() - killDns;
|
||||||
|
for (int i = 0; i < killDns; i++) {
|
||||||
|
cluster.stopDataNode(i);
|
||||||
|
}
|
||||||
|
cluster.restartNameNodes();
|
||||||
|
cluster.triggerHeartbeats();
|
||||||
|
DatanodeInfo[] info = dfs.getClient().datanodeReport(
|
||||||
|
DatanodeReportType.LIVE);
|
||||||
|
assertEquals("Mismatches number of live Dns ", numDatanodes, info.length);
|
||||||
|
Path srcPath = new Path(dir, "testAddBlockWhenNoSufficientParityNodes");
|
||||||
|
int fileLength = HdfsConstants.BLOCK_STRIPED_CELL_SIZE - 1000;
|
||||||
|
final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
|
||||||
|
DFSTestUtil.writeFile(dfs, srcPath, new String(expected));
|
||||||
|
StripedFileTestUtil.verifySeek(dfs, srcPath, fileLength);
|
||||||
|
} finally {
|
||||||
|
tearDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void runTest(final Path p, final int length, final int killPos,
|
private void runTest(final Path p, final int length, final int killPos,
|
||||||
final int dnIndex, final boolean tokenExpire) throws Exception {
|
final int dnIndex, final boolean tokenExpire) throws Exception {
|
||||||
LOG.info("p=" + p + ", length=" + length + ", killPos=" + killPos
|
LOG.info("p=" + p + ", length=" + length + ", killPos=" + killPos
|
||||||
|
Loading…
Reference in New Issue
Block a user