HDFS-8202. Improve end to end stirpping file test to add erasure recovering test. Contributed by Xinwei Qin.
This commit is contained in:
parent
4a72be6e0e
commit
ba90c02853
@ -379,3 +379,6 @@
|
|||||||
|
|
||||||
HDFS-8769. Erasure coding: unit test for SequentialBlockGroupIdGenerator.
|
HDFS-8769. Erasure coding: unit test for SequentialBlockGroupIdGenerator.
|
||||||
(Rakesh R via waltersu4549)
|
(Rakesh R via waltersu4549)
|
||||||
|
|
||||||
|
HDFS-8202. Improve end to end stirpping file test to add erasure recovering
|
||||||
|
test. (Xinwei Qin via zhz)
|
||||||
|
@ -17,10 +17,13 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
|
import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -29,8 +32,11 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
public class StripedFileTestUtil {
|
public class StripedFileTestUtil {
|
||||||
|
public static final Log LOG = LogFactory.getLog(StripedFileTestUtil.class);
|
||||||
|
|
||||||
static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
|
static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS;
|
||||||
static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
|
static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS;
|
||||||
|
|
||||||
@ -193,4 +199,63 @@ static void assertSeekAndRead(FSDataInputStream fsdis, int pos,
|
|||||||
StripedFileTestUtil.getByte(pos + i), buf[i]);
|
StripedFileTestUtil.getByte(pos + i), buf[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void killDatanode(MiniDFSCluster cluster, DFSStripedOutputStream out,
|
||||||
|
final int dnIndex, final AtomicInteger pos) {
|
||||||
|
final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex);
|
||||||
|
final DatanodeInfo datanode = getDatanodes(s);
|
||||||
|
LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos);
|
||||||
|
cluster.stopDataNode(datanode.getXferAddr());
|
||||||
|
}
|
||||||
|
|
||||||
|
static DatanodeInfo getDatanodes(StripedDataStreamer streamer) {
|
||||||
|
for(;;) {
|
||||||
|
final DatanodeInfo[] datanodes = streamer.getNodes();
|
||||||
|
if (datanodes != null) {
|
||||||
|
Assert.assertEquals(1, datanodes.length);
|
||||||
|
Assert.assertNotNull(datanodes[0]);
|
||||||
|
return datanodes[0];
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException ignored) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate n random and different numbers within
|
||||||
|
* specified non-negative integer range
|
||||||
|
* @param min minimum of the range
|
||||||
|
* @param max maximum of the range
|
||||||
|
* @param n number to be generated
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public static int[] randomArray(int min, int max, int n){
|
||||||
|
if (n > (max - min + 1) || max < min || min < 0 || max < 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
int[] result = new int[n];
|
||||||
|
for (int i = 0; i < n; i++) {
|
||||||
|
result[i] = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
while(count < n) {
|
||||||
|
int num = (int) (Math.random() * (max - min)) + min;
|
||||||
|
boolean flag = true;
|
||||||
|
for (int j = 0; j < n; j++) {
|
||||||
|
if(num == result[j]){
|
||||||
|
flag = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(flag){
|
||||||
|
result[count] = num;
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -181,7 +181,7 @@ private void runTest(final Path p, final int length, final int killPos,
|
|||||||
waitTokenExpires(out);
|
waitTokenExpires(out);
|
||||||
}
|
}
|
||||||
|
|
||||||
killDatanode(cluster, stripedOut, dnIndex, pos);
|
StripedFileTestUtil.killDatanode(cluster, stripedOut, dnIndex, pos);
|
||||||
killed = true;
|
killed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -217,30 +217,6 @@ static long getGenerationStamp(DFSStripedOutputStream out)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static DatanodeInfo getDatanodes(StripedDataStreamer streamer) {
|
|
||||||
for(;;) {
|
|
||||||
final DatanodeInfo[] datanodes = streamer.getNodes();
|
|
||||||
if (datanodes != null) {
|
|
||||||
Assert.assertEquals(1, datanodes.length);
|
|
||||||
Assert.assertNotNull(datanodes[0]);
|
|
||||||
return datanodes[0];
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
Thread.sleep(100);
|
|
||||||
} catch (InterruptedException ignored) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void killDatanode(MiniDFSCluster cluster, DFSStripedOutputStream out,
|
|
||||||
final int dnIndex, final AtomicInteger pos) {
|
|
||||||
final StripedDataStreamer s = out.getStripedDataStreamer(dnIndex);
|
|
||||||
final DatanodeInfo datanode = getDatanodes(s);
|
|
||||||
LOG.info("killDatanode " + dnIndex + ": " + datanode + ", pos=" + pos);
|
|
||||||
cluster.stopDataNode(datanode.getXferAddr());
|
|
||||||
}
|
|
||||||
|
|
||||||
static void checkData(DistributedFileSystem dfs, String src, int length,
|
static void checkData(DistributedFileSystem dfs, String src, int length,
|
||||||
int killedDnIndex, long oldGS) throws IOException {
|
int killedDnIndex, long oldGS) throws IOException {
|
||||||
List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
|
List<List<LocatedBlock>> blockGroupList = new ArrayList<>();
|
||||||
|
@ -20,10 +20,11 @@
|
|||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
@ -43,13 +44,21 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.StripedFileTestUtil.*;
|
import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
|
||||||
|
import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
|
||||||
|
import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
|
||||||
|
import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
|
||||||
|
import static org.apache.hadoop.hdfs.StripedFileTestUtil.parityBlocks;
|
||||||
|
|
||||||
public class TestReadStripedFileWithDecoding {
|
public class TestReadStripedFileWithDecoding {
|
||||||
static final Log LOG = LogFactory.getLog(TestReadStripedFileWithDecoding.class);
|
static final Log LOG = LogFactory.getLog(TestReadStripedFileWithDecoding.class);
|
||||||
|
|
||||||
private MiniDFSCluster cluster;
|
private MiniDFSCluster cluster;
|
||||||
private DistributedFileSystem fs;
|
private DistributedFileSystem fs;
|
||||||
|
private final int smallFileLength = blockSize * dataBlocks - 123;
|
||||||
|
private final int largeFileLength = blockSize * dataBlocks + 123;
|
||||||
|
private final int[] fileLengths = {smallFileLength, largeFileLength};
|
||||||
|
private final int[] dnFailureNums = {1, 2, 3};
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws IOException {
|
public void setup() throws IOException {
|
||||||
@ -67,82 +76,64 @@ public void tearDown() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReadWithDNFailure1() throws IOException {
|
|
||||||
testReadWithDNFailure("/foo", cellSize * (dataBlocks + 2), 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReadWithDNFailure2() throws IOException {
|
|
||||||
testReadWithDNFailure("/foo", cellSize * (dataBlocks + 2), cellSize * 5);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReadWithDNFailure3() throws IOException {
|
|
||||||
testReadWithDNFailure("/foo", cellSize * dataBlocks, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delete a data block before reading. Verify the decoding works correctly.
|
* Shutdown tolerable number of Datanode before reading.
|
||||||
|
* Verify the decoding works correctly.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(timeout=300000)
|
||||||
public void testReadCorruptedData() throws IOException {
|
public void testReadWithDNFailure() throws IOException {
|
||||||
// create file
|
for (int fileLength : fileLengths) {
|
||||||
final Path file = new Path("/partially_deleted");
|
for (int dnFailureNum : dnFailureNums) {
|
||||||
final int length = cellSize * dataBlocks * 2;
|
try {
|
||||||
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
|
// setup a new cluster with no dead datanode
|
||||||
DFSTestUtil.writeFile(fs, file, bytes);
|
setup();
|
||||||
|
testReadWithDNFailure(fileLength, dnFailureNum);
|
||||||
// corrupt the first data block
|
} catch (IOException ioe) {
|
||||||
// find the corresponding data node
|
String fileType = fileLength < (blockSize * dataBlocks) ?
|
||||||
int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
|
"smallFile" : "largeFile";
|
||||||
Assert.assertNotEquals(-1, dnIndex);
|
LOG.error("Failed to read file with DN failure:"
|
||||||
// find the target block
|
+ " fileType = "+ fileType
|
||||||
LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
|
+ ", dnFailureNum = " + dnFailureNum);
|
||||||
.getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
|
} finally {
|
||||||
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
|
// tear down the cluster
|
||||||
cellSize, dataBlocks, parityBlocks);
|
tearDown();
|
||||||
// find the target block file
|
}
|
||||||
File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
|
}
|
||||||
File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
|
|
||||||
Assert.assertTrue("Block file does not exist", blkFile.exists());
|
|
||||||
// delete the block file
|
|
||||||
LOG.info("Deliberately removing file " + blkFile.getName());
|
|
||||||
Assert.assertTrue("Cannot remove file", blkFile.delete());
|
|
||||||
verifyRead(file, length, bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Corrupt the content of the data block before reading.
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testReadCorruptedData2() throws IOException {
|
|
||||||
// create file
|
|
||||||
final Path file = new Path("/partially_corrupted");
|
|
||||||
final int length = cellSize * dataBlocks * 2;
|
|
||||||
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
|
|
||||||
DFSTestUtil.writeFile(fs, file, bytes);
|
|
||||||
|
|
||||||
// corrupt the first data block
|
|
||||||
// find the first data node
|
|
||||||
int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
|
|
||||||
Assert.assertNotEquals(-1, dnIndex);
|
|
||||||
// find the first data block
|
|
||||||
LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
|
|
||||||
.getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
|
|
||||||
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
|
|
||||||
cellSize, dataBlocks, parityBlocks);
|
|
||||||
// find the first block file
|
|
||||||
File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
|
|
||||||
File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
|
|
||||||
Assert.assertTrue("Block file does not exist", blkFile.exists());
|
|
||||||
// corrupt the block file
|
|
||||||
LOG.info("Deliberately corrupting file " + blkFile.getName());
|
|
||||||
try (FileOutputStream out = new FileOutputStream(blkFile)) {
|
|
||||||
out.write("corruption".getBytes());
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
verifyRead(file, length, bytes);
|
/**
|
||||||
|
* Corrupt tolerable number of block before reading.
|
||||||
|
* Verify the decoding works correctly.
|
||||||
|
*/
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testReadCorruptedData() throws IOException {
|
||||||
|
for (int fileLength : fileLengths) {
|
||||||
|
for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) {
|
||||||
|
for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) {
|
||||||
|
String src = "/corrupted_" + dataDelNum + "_" + parityDelNum;
|
||||||
|
testReadWithBlockCorrupted(src, fileLength,
|
||||||
|
dataDelNum, parityDelNum, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete tolerable number of block before reading.
|
||||||
|
* Verify the decoding works correctly.
|
||||||
|
*/
|
||||||
|
@Test(timeout=300000)
|
||||||
|
public void testReadCorruptedDataByDeleting() throws IOException {
|
||||||
|
for (int fileLength : fileLengths) {
|
||||||
|
for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) {
|
||||||
|
for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) {
|
||||||
|
String src = "/deleted_" + dataDelNum + "_" + parityDelNum;
|
||||||
|
testReadWithBlockCorrupted(src, fileLength,
|
||||||
|
dataDelNum, parityDelNum, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private int findFirstDataNode(Path file, long length) throws IOException {
|
private int findFirstDataNode(Path file, long length) throws IOException {
|
||||||
@ -159,87 +150,45 @@ private int findFirstDataNode(Path file, long length) throws IOException {
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyRead(Path file, int length, byte[] expected)
|
private void verifyRead(Path testPath, int length, byte[] expected)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// pread
|
byte[] buffer = new byte[length + 100];
|
||||||
try (FSDataInputStream fsdis = fs.open(file)) {
|
StripedFileTestUtil.verifyLength(fs, testPath, length);
|
||||||
byte[] buf = new byte[length];
|
StripedFileTestUtil.verifyPread(fs, testPath, length, expected, buffer);
|
||||||
int readLen = fsdis.read(0, buf, 0, buf.length);
|
StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected, buffer);
|
||||||
Assert.assertEquals("The fileSize of file should be the same to write size",
|
StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected,
|
||||||
length, readLen);
|
ByteBuffer.allocate(length + 100));
|
||||||
Assert.assertArrayEquals(expected, buf);
|
StripedFileTestUtil.verifySeek(fs, testPath, length);
|
||||||
}
|
|
||||||
|
|
||||||
// stateful read
|
|
||||||
ByteBuffer result = ByteBuffer.allocate(length);
|
|
||||||
ByteBuffer buf = ByteBuffer.allocate(1024);
|
|
||||||
int readLen = 0;
|
|
||||||
int ret;
|
|
||||||
try (FSDataInputStream in = fs.open(file)) {
|
|
||||||
while ((ret = in.read(buf)) >= 0) {
|
|
||||||
readLen += ret;
|
|
||||||
buf.flip();
|
|
||||||
result.put(buf);
|
|
||||||
buf.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Assert.assertEquals("The length of file should be the same to write size",
|
|
||||||
length, readLen);
|
|
||||||
Assert.assertArrayEquals(expected, result.array());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testReadWithDNFailure(String file, int fileSize,
|
private void testReadWithDNFailure(int fileLength, int dnFailureNum)
|
||||||
int startOffsetInFile) throws IOException {
|
throws IOException {
|
||||||
final int failedDNIdx = 2;
|
String fileType = fileLength < (blockSize * dataBlocks) ?
|
||||||
Path testPath = new Path(file);
|
"smallFile" : "largeFile";
|
||||||
final byte[] bytes = StripedFileTestUtil.generateBytes(fileSize);
|
String src = "/dnFailure_" + dnFailureNum + "_" + fileType;
|
||||||
|
LOG.info("testReadWithDNFailure: file = " + src
|
||||||
|
+ ", fileSize = " + fileLength
|
||||||
|
+ ", dnFailureNum = " + dnFailureNum);
|
||||||
|
|
||||||
|
Path testPath = new Path(src);
|
||||||
|
final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
|
||||||
DFSTestUtil.writeFile(fs, testPath, bytes);
|
DFSTestUtil.writeFile(fs, testPath, bytes);
|
||||||
|
|
||||||
// shut down the DN that holds an internal data block
|
// shut down the DN that holds an internal data block
|
||||||
BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,
|
BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,
|
||||||
cellSize);
|
cellSize);
|
||||||
String name = (locs[0].getNames())[failedDNIdx];
|
for (int failedDnIdx = 0; failedDnIdx < dnFailureNum; failedDnIdx++) {
|
||||||
for (DataNode dn : cluster.getDataNodes()) {
|
String name = (locs[0].getNames())[failedDnIdx];
|
||||||
int port = dn.getXferPort();
|
for (DataNode dn : cluster.getDataNodes()) {
|
||||||
if (name.contains(Integer.toString(port))) {
|
int port = dn.getXferPort();
|
||||||
dn.shutdown();
|
if (name.contains(Integer.toString(port))) {
|
||||||
break;
|
dn.shutdown();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// pread
|
// check file length, pread, stateful read and seek
|
||||||
try (FSDataInputStream fsdis = fs.open(testPath)) {
|
verifyRead(testPath, fileLength, bytes);
|
||||||
byte[] buf = new byte[fileSize];
|
|
||||||
int readLen = fsdis.read(startOffsetInFile, buf, 0, buf.length);
|
|
||||||
Assert.assertEquals("The fileSize of file should be the same to write size",
|
|
||||||
fileSize - startOffsetInFile, readLen);
|
|
||||||
|
|
||||||
byte[] expected = new byte[readLen];
|
|
||||||
System.arraycopy(bytes, startOffsetInFile, expected, 0,
|
|
||||||
fileSize - startOffsetInFile);
|
|
||||||
|
|
||||||
for (int i = startOffsetInFile; i < fileSize; i++) {
|
|
||||||
Assert.assertEquals("Byte at " + i + " should be the same",
|
|
||||||
expected[i - startOffsetInFile], buf[i - startOffsetInFile]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// stateful read
|
|
||||||
ByteBuffer result = ByteBuffer.allocate(fileSize);
|
|
||||||
ByteBuffer buf = ByteBuffer.allocate(1024);
|
|
||||||
int readLen = 0;
|
|
||||||
int ret;
|
|
||||||
try (FSDataInputStream in = fs.open(testPath)) {
|
|
||||||
while ((ret = in.read(buf)) >= 0) {
|
|
||||||
readLen += ret;
|
|
||||||
buf.flip();
|
|
||||||
result.put(buf);
|
|
||||||
buf.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Assert.assertEquals("The length of file should be the same to write size",
|
|
||||||
fileSize, readLen);
|
|
||||||
Assert.assertArrayEquals(bytes, result.array());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -279,21 +228,8 @@ public void testReportBadBlock() throws IOException {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
// do stateful read
|
// do stateful read
|
||||||
ByteBuffer result = ByteBuffer.allocate(length);
|
StripedFileTestUtil.verifyStatefulRead(fs, file, length, bytes,
|
||||||
ByteBuffer buf = ByteBuffer.allocate(1024);
|
ByteBuffer.allocate(1024));
|
||||||
int readLen = 0;
|
|
||||||
int ret;
|
|
||||||
try (FSDataInputStream in = fs.open(file)) {
|
|
||||||
while ((ret = in.read(buf)) >= 0) {
|
|
||||||
readLen += ret;
|
|
||||||
buf.flip();
|
|
||||||
result.put(buf);
|
|
||||||
buf.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Assert.assertEquals("The length of file should be the same to write size",
|
|
||||||
length, readLen);
|
|
||||||
Assert.assertArrayEquals(bytes, result.array());
|
|
||||||
|
|
||||||
// check whether the corruption has been reported to the NameNode
|
// check whether the corruption has been reported to the NameNode
|
||||||
final FSNamesystem ns = cluster.getNamesystem();
|
final FSNamesystem ns = cluster.getNamesystem();
|
||||||
@ -341,4 +277,82 @@ public void testInvalidateBlock() throws IOException {
|
|||||||
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
|
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test reading a file with some blocks(data blocks or parity blocks or both)
|
||||||
|
* deleted or corrupted.
|
||||||
|
* @param src file path
|
||||||
|
* @param fileLength file length
|
||||||
|
* @param dataBlkDelNum the deleted or corrupted number of data blocks.
|
||||||
|
* @param parityBlkDelNum the deleted or corrupted number of parity blocks.
|
||||||
|
* @param deleteBlockFile whether block file is deleted or corrupted.
|
||||||
|
* true is to delete the block file.
|
||||||
|
* false is to corrupt the content of the block file.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void testReadWithBlockCorrupted(String src, int fileLength,
|
||||||
|
int dataBlkDelNum, int parityBlkDelNum, boolean deleteBlockFile)
|
||||||
|
throws IOException {
|
||||||
|
LOG.info("testReadWithBlockCorrupted: file = " + src
|
||||||
|
+ ", dataBlkDelNum = " + dataBlkDelNum
|
||||||
|
+ ", parityBlkDelNum = " + parityBlkDelNum
|
||||||
|
+ ", deleteBlockFile? " + deleteBlockFile);
|
||||||
|
int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
|
||||||
|
Assert.assertTrue("dataBlkDelNum and parityBlkDelNum should be positive",
|
||||||
|
dataBlkDelNum >= 0 && parityBlkDelNum >= 0);
|
||||||
|
Assert.assertTrue("The sum of dataBlkDelNum and parityBlkDelNum " +
|
||||||
|
"should be between 1 ~ " + parityBlocks, recoverBlkNum <= parityBlocks);
|
||||||
|
|
||||||
|
// write a file with the length of writeLen
|
||||||
|
Path srcPath = new Path(src);
|
||||||
|
final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
|
||||||
|
DFSTestUtil.writeFile(fs, srcPath, bytes);
|
||||||
|
|
||||||
|
// delete or corrupt some blocks
|
||||||
|
corruptBlocks(srcPath, dataBlkDelNum, parityBlkDelNum, deleteBlockFile);
|
||||||
|
|
||||||
|
// check the file can be read after some blocks were deleted
|
||||||
|
verifyRead(srcPath, fileLength, bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void corruptBlocks(Path srcPath, int dataBlkDelNum,
|
||||||
|
int parityBlkDelNum, boolean deleteBlockFile) throws IOException {
|
||||||
|
int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
|
||||||
|
|
||||||
|
LocatedBlocks locatedBlocks = getLocatedBlocks(srcPath);
|
||||||
|
LocatedStripedBlock lastBlock =
|
||||||
|
(LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
|
||||||
|
|
||||||
|
int[] delDataBlkIndices = StripedFileTestUtil.randomArray(0, dataBlocks,
|
||||||
|
dataBlkDelNum);
|
||||||
|
Assert.assertNotNull(delDataBlkIndices);
|
||||||
|
int[] delParityBlkIndices = StripedFileTestUtil.randomArray(dataBlocks,
|
||||||
|
dataBlocks + parityBlocks, parityBlkDelNum);
|
||||||
|
Assert.assertNotNull(delParityBlkIndices);
|
||||||
|
|
||||||
|
int[] delBlkIndices = new int[recoverBlkNum];
|
||||||
|
System.arraycopy(delDataBlkIndices, 0,
|
||||||
|
delBlkIndices, 0, delDataBlkIndices.length);
|
||||||
|
System.arraycopy(delParityBlkIndices, 0,
|
||||||
|
delBlkIndices, delDataBlkIndices.length, delParityBlkIndices.length);
|
||||||
|
|
||||||
|
ExtendedBlock[] delBlocks = new ExtendedBlock[recoverBlkNum];
|
||||||
|
for (int i = 0; i < recoverBlkNum; i++) {
|
||||||
|
delBlocks[i] = StripedBlockUtil
|
||||||
|
.constructInternalBlock(lastBlock.getBlock(),
|
||||||
|
cellSize, dataBlocks, delBlkIndices[i]);
|
||||||
|
if (deleteBlockFile) {
|
||||||
|
// delete the block file
|
||||||
|
cluster.corruptBlockOnDataNodesByDeletingBlockFile(delBlocks[i]);
|
||||||
|
} else {
|
||||||
|
// corrupt the block file
|
||||||
|
cluster.corruptBlockOnDataNodes(delBlocks[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private LocatedBlocks getLocatedBlocks(Path filePath) throws IOException {
|
||||||
|
return fs.getClient().getLocatedBlocks(filePath.toString(),
|
||||||
|
0, Long.MAX_VALUE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,162 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.StripedFileTestUtil.blockSize;
|
||||||
|
import static org.apache.hadoop.hdfs.StripedFileTestUtil.cellSize;
|
||||||
|
import static org.apache.hadoop.hdfs.StripedFileTestUtil.dataBlocks;
|
||||||
|
import static org.apache.hadoop.hdfs.StripedFileTestUtil.numDNs;
|
||||||
|
import static org.apache.hadoop.hdfs.StripedFileTestUtil.parityBlocks;
|
||||||
|
|
||||||
|
public class TestWriteStripedFileWithFailure {
|
||||||
|
public static final Log LOG = LogFactory
|
||||||
|
.getLog(TestReadStripedFileWithMissingBlocks.class);
|
||||||
|
private static MiniDFSCluster cluster;
|
||||||
|
private static FileSystem fs;
|
||||||
|
private static Configuration conf = new HdfsConfiguration();
|
||||||
|
private final int smallFileLength = blockSize * dataBlocks - 123;
|
||||||
|
private final int largeFileLength = blockSize * dataBlocks + 123;
|
||||||
|
private final int[] fileLengths = {smallFileLength, largeFileLength};
|
||||||
|
|
||||||
|
public void setup() throws IOException {
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
|
||||||
|
cluster.getFileSystem().getClient().createErasureCodingZone("/",
|
||||||
|
null, cellSize);
|
||||||
|
fs = cluster.getFileSystem();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void tearDown() throws IOException {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test writing file with some Datanodes failure
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testWriteStripedFileWithDNFailure() throws IOException {
|
||||||
|
for (int fileLength : fileLengths) {
|
||||||
|
for (int dataDelNum = 1; dataDelNum < 4; dataDelNum++) {
|
||||||
|
for (int parityDelNum = 0; (dataDelNum+parityDelNum) < 4; parityDelNum++) {
|
||||||
|
try {
|
||||||
|
// setup a new cluster with no dead datanode
|
||||||
|
setup();
|
||||||
|
writeFileWithDNFailure(fileLength, dataDelNum, parityDelNum);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
String fileType = fileLength < (blockSize * dataBlocks) ?
|
||||||
|
"smallFile" : "largeFile";
|
||||||
|
LOG.error("Failed to write file with DN failure:"
|
||||||
|
+ " fileType = "+ fileType
|
||||||
|
+ ", dataDelNum = " + dataDelNum
|
||||||
|
+ ", parityDelNum = " + parityDelNum);
|
||||||
|
throw ioe;
|
||||||
|
} finally {
|
||||||
|
// tear down the cluster
|
||||||
|
tearDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test writing a file with shutting down some DNs(data DNs or parity DNs or both).
|
||||||
|
* @param fileLength file length
|
||||||
|
* @param dataDNFailureNum the shutdown number of data DNs
|
||||||
|
* @param parityDNFailureNum the shutdown number of parity DNs
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void writeFileWithDNFailure(int fileLength,
|
||||||
|
int dataDNFailureNum, int parityDNFailureNum) throws IOException {
|
||||||
|
String fileType = fileLength < (blockSize * dataBlocks) ?
|
||||||
|
"smallFile" : "largeFile";
|
||||||
|
String src = "/dnFailure_" + dataDNFailureNum + "_" + parityDNFailureNum
|
||||||
|
+ "_" + fileType;
|
||||||
|
LOG.info("writeFileWithDNFailure: file = " + src
|
||||||
|
+ ", fileType = " + fileType
|
||||||
|
+ ", dataDNFailureNum = " + dataDNFailureNum
|
||||||
|
+ ", parityDNFailureNum = " + parityDNFailureNum);
|
||||||
|
|
||||||
|
Path srcPath = new Path(src);
|
||||||
|
final AtomicInteger pos = new AtomicInteger();
|
||||||
|
final FSDataOutputStream out = fs.create(srcPath);
|
||||||
|
final DFSStripedOutputStream stripedOut
|
||||||
|
= (DFSStripedOutputStream)out.getWrappedStream();
|
||||||
|
|
||||||
|
int[] dataDNFailureIndices = StripedFileTestUtil.randomArray(0, dataBlocks,
|
||||||
|
dataDNFailureNum);
|
||||||
|
Assert.assertNotNull(dataDNFailureIndices);
|
||||||
|
int[] parityDNFailureIndices = StripedFileTestUtil.randomArray(dataBlocks,
|
||||||
|
dataBlocks + parityBlocks, dataDNFailureNum);
|
||||||
|
Assert.assertNotNull(parityDNFailureIndices);
|
||||||
|
|
||||||
|
int[] failedDataNodes = new int[dataDNFailureNum + parityDNFailureNum];
|
||||||
|
System.arraycopy(dataDNFailureIndices, 0, failedDataNodes,
|
||||||
|
0, dataDNFailureIndices.length);
|
||||||
|
System.arraycopy(parityDNFailureIndices, 0, failedDataNodes,
|
||||||
|
dataDNFailureIndices.length, parityDNFailureIndices.length);
|
||||||
|
|
||||||
|
final int killPos = fileLength/2;
|
||||||
|
for (; pos.get() < fileLength; ) {
|
||||||
|
final int i = pos.getAndIncrement();
|
||||||
|
if (i == killPos) {
|
||||||
|
for(int failedDn : failedDataNodes) {
|
||||||
|
StripedFileTestUtil.killDatanode(cluster, stripedOut, failedDn, pos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
write(out, i);
|
||||||
|
}
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
// make sure the expected number of Datanode have been killed
|
||||||
|
int dnFailureNum = dataDNFailureNum + parityDNFailureNum;
|
||||||
|
Assert.assertEquals(cluster.getDataNodes().size(), numDNs - dnFailureNum);
|
||||||
|
|
||||||
|
byte[] smallBuf = new byte[1024];
|
||||||
|
byte[] largeBuf = new byte[fileLength + 100];
|
||||||
|
final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
|
||||||
|
StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);
|
||||||
|
StripedFileTestUtil.verifySeek(fs, srcPath, fileLength);
|
||||||
|
StripedFileTestUtil.verifyStatefulRead(fs, srcPath, fileLength, expected,
|
||||||
|
smallBuf);
|
||||||
|
StripedFileTestUtil.verifyPread(fs, srcPath, fileLength, expected, largeBuf);
|
||||||
|
|
||||||
|
// delete the file
|
||||||
|
fs.delete(srcPath, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
void write(FSDataOutputStream out, int i) throws IOException {
|
||||||
|
try {
|
||||||
|
out.write(StripedFileTestUtil.getByte(i));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IOException("Failed at i=" + i, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user