HDFS-6101. TestReplaceDatanodeOnFailure fails occasionally. Contributed by Wei-Chiu Chuang.
This commit is contained in:
parent
28dfe721b8
commit
1777608fa0
@ -2368,6 +2368,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HDFS-9433. DFS getEZForPath API on a non-existent file should throw FileNotFoundException
|
HDFS-9433. DFS getEZForPath API on a non-existent file should throw FileNotFoundException
|
||||||
(Rakesh R via umamahesh)
|
(Rakesh R via umamahesh)
|
||||||
|
|
||||||
|
HDFS-6101. TestReplaceDatanodeOnFailure fails occasionally.
|
||||||
|
(Wei-Chiu Chuang via cnauroth)
|
||||||
|
|
||||||
Release 2.7.3 - UNRELEASED
|
Release 2.7.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -17,10 +17,14 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
@ -41,7 +45,7 @@
|
|||||||
* This class tests that data nodes are correctly replaced on failure.
|
* This class tests that data nodes are correctly replaced on failure.
|
||||||
*/
|
*/
|
||||||
public class TestReplaceDatanodeOnFailure {
|
public class TestReplaceDatanodeOnFailure {
|
||||||
static final Log LOG = AppendTestUtil.LOG;
|
static final Log LOG = LogFactory.getLog(TestReplaceDatanodeOnFailure.class);
|
||||||
|
|
||||||
static final String DIR = "/" + TestReplaceDatanodeOnFailure.class.getSimpleName() + "/";
|
static final String DIR = "/" + TestReplaceDatanodeOnFailure.class.getSimpleName() + "/";
|
||||||
static final short REPLICATION = 3;
|
static final short REPLICATION = 3;
|
||||||
@ -113,7 +117,8 @@ public void testDefaultPolicy() throws Exception {
|
|||||||
@Test
|
@Test
|
||||||
public void testReplaceDatanodeOnFailure() throws Exception {
|
public void testReplaceDatanodeOnFailure() throws Exception {
|
||||||
final Configuration conf = new HdfsConfiguration();
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
// do not consider load factor when selecting a data node
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
|
||||||
//always replace a datanode
|
//always replace a datanode
|
||||||
ReplaceDatanodeOnFailure.write(Policy.ALWAYS, true, conf);
|
ReplaceDatanodeOnFailure.write(Policy.ALWAYS, true, conf);
|
||||||
|
|
||||||
@ -123,31 +128,40 @@ public void testReplaceDatanodeOnFailure() throws Exception {
|
|||||||
).racks(racks).numDataNodes(REPLICATION).build();
|
).racks(racks).numDataNodes(REPLICATION).build();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
final DistributedFileSystem fs = cluster.getFileSystem();
|
final DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
final Path dir = new Path(DIR);
|
final Path dir = new Path(DIR);
|
||||||
|
final int NUM_WRITERS = 10;
|
||||||
final SlowWriter[] slowwriters = new SlowWriter[10];
|
final int FIRST_BATCH = 5;
|
||||||
|
final SlowWriter[] slowwriters = new SlowWriter[NUM_WRITERS];
|
||||||
for(int i = 1; i <= slowwriters.length; i++) {
|
for(int i = 1; i <= slowwriters.length; i++) {
|
||||||
//create slow writers in different speed
|
//create slow writers in different speed
|
||||||
slowwriters[i - 1] = new SlowWriter(fs, new Path(dir, "file" + i), i*200L);
|
slowwriters[i - 1] = new SlowWriter(fs, new Path(dir, "file" + i), i*200L);
|
||||||
}
|
}
|
||||||
|
|
||||||
for(SlowWriter s : slowwriters) {
|
for(int i = 0; i < FIRST_BATCH; i++) {
|
||||||
s.start();
|
slowwriters[i].start();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Let slow writers write something.
|
// Let slow writers write something.
|
||||||
// Some of them are too slow and will be not yet started.
|
// Some of them are too slow and will be not yet started.
|
||||||
sleepSeconds(1);
|
sleepSeconds(3);
|
||||||
|
|
||||||
//start new datanodes
|
//start new datanodes
|
||||||
cluster.startDataNodes(conf, 2, true, null, new String[]{RACK1, RACK1});
|
cluster.startDataNodes(conf, 2, true, null, new String[]{RACK1, RACK1});
|
||||||
|
cluster.waitActive();
|
||||||
|
// wait for first block reports for up to 10 seconds
|
||||||
|
cluster.waitFirstBRCompleted(0, 10000);
|
||||||
|
|
||||||
//stop an old datanode
|
//stop an old datanode
|
||||||
cluster.stopDataNode(AppendTestUtil.nextInt(REPLICATION));
|
MiniDFSCluster.DataNodeProperties dnprop = cluster.stopDataNode(
|
||||||
|
AppendTestUtil.nextInt(REPLICATION));
|
||||||
//Let the slow writer writes a few more seconds
|
|
||||||
//Everyone should have written something.
|
for(int i = FIRST_BATCH; i < slowwriters.length; i++) {
|
||||||
sleepSeconds(5);
|
slowwriters[i].start();
|
||||||
|
}
|
||||||
|
|
||||||
|
waitForBlockReplication(slowwriters);
|
||||||
|
|
||||||
//check replication and interrupt.
|
//check replication and interrupt.
|
||||||
for(SlowWriter s : slowwriters) {
|
for(SlowWriter s : slowwriters) {
|
||||||
@ -181,6 +195,26 @@ public void testReplaceDatanodeOnFailure() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void waitForBlockReplication(final SlowWriter[] slowwriters) throws
|
||||||
|
TimeoutException, InterruptedException {
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override public Boolean get() {
|
||||||
|
try {
|
||||||
|
for (SlowWriter s : slowwriters) {
|
||||||
|
if (s.out.getCurrentBlockReplication() < REPLICATION) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("IOException is thrown while getting the file block " +
|
||||||
|
"replication factor", e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}, 1000, 10000);
|
||||||
|
}
|
||||||
|
|
||||||
static void sleepSeconds(final int waittime) throws InterruptedException {
|
static void sleepSeconds(final int waittime) throws InterruptedException {
|
||||||
LOG.info("Wait " + waittime + " seconds");
|
LOG.info("Wait " + waittime + " seconds");
|
||||||
Thread.sleep(waittime * 1000L);
|
Thread.sleep(waittime * 1000L);
|
||||||
@ -191,7 +225,7 @@ static class SlowWriter extends Thread {
|
|||||||
final HdfsDataOutputStream out;
|
final HdfsDataOutputStream out;
|
||||||
final long sleepms;
|
final long sleepms;
|
||||||
private volatile boolean running = true;
|
private volatile boolean running = true;
|
||||||
|
|
||||||
SlowWriter(DistributedFileSystem fs, Path filepath, final long sleepms
|
SlowWriter(DistributedFileSystem fs, Path filepath, final long sleepms
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
super(SlowWriter.class.getSimpleName() + ":" + filepath);
|
super(SlowWriter.class.getSimpleName() + ":" + filepath);
|
||||||
@ -203,12 +237,14 @@ static class SlowWriter extends Thread {
|
|||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
sleep(sleepms);
|
sleep(sleepms);
|
||||||
for(; running; i++) {
|
for(; running; i++) {
|
||||||
LOG.info(getName() + " writes " + i);
|
LOG.info(getName() + " writes " + i);
|
||||||
out.write(i);
|
out.write(i);
|
||||||
out.hflush();
|
out.hflush();
|
||||||
|
|
||||||
sleep(sleepms);
|
sleep(sleepms);
|
||||||
}
|
}
|
||||||
} catch(InterruptedException e) {
|
} catch(InterruptedException e) {
|
||||||
|
Loading…
Reference in New Issue
Block a user