From 1777608fa075a807c645619fda87cb8de1b0350c Mon Sep 17 00:00:00 2001 From: cnauroth Date: Tue, 24 Nov 2015 09:39:21 -0800 Subject: [PATCH] HDFS-6101. TestReplaceDatanodeOnFailure fails occasionally. Contributed by Wei-Chiu Chuang. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/TestReplaceDatanodeOnFailure.java | 64 +++++++++++++++---- 2 files changed, 53 insertions(+), 14 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ca2ed1500c..d39ed3f0ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -2368,6 +2368,9 @@ Release 2.8.0 - UNRELEASED HDFS-9433. DFS getEZForPath API on a non-existent file should throw FileNotFoundException (Rakesh R via umamahesh) + HDFS-6101. TestReplaceDatanodeOnFailure fails occasionally. + (Wei-Chiu Chuang via cnauroth) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java index d3510203eb..bbc447c9eb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java @@ -17,10 +17,14 @@ */ package org.apache.hadoop.hdfs; +import com.google.common.base.Supplier; + import java.io.IOException; import java.util.Arrays; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -41,7 +45,7 @@ * This class tests that data nodes are correctly replaced on failure. */ public class TestReplaceDatanodeOnFailure { - static final Log LOG = AppendTestUtil.LOG; + static final Log LOG = LogFactory.getLog(TestReplaceDatanodeOnFailure.class); static final String DIR = "/" + TestReplaceDatanodeOnFailure.class.getSimpleName() + "/"; static final short REPLICATION = 3; @@ -113,7 +117,8 @@ public void testDefaultPolicy() throws Exception { @Test public void testReplaceDatanodeOnFailure() throws Exception { final Configuration conf = new HdfsConfiguration(); - + // do not consider load factor when selecting a data node + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false); //always replace a datanode ReplaceDatanodeOnFailure.write(Policy.ALWAYS, true, conf); @@ -123,31 +128,40 @@ public void testReplaceDatanodeOnFailure() throws Exception { ).racks(racks).numDataNodes(REPLICATION).build(); try { + cluster.waitActive(); final DistributedFileSystem fs = cluster.getFileSystem(); final Path dir = new Path(DIR); - - final SlowWriter[] slowwriters = new SlowWriter[10]; + final int NUM_WRITERS = 10; + final int FIRST_BATCH = 5; + final SlowWriter[] slowwriters = new SlowWriter[NUM_WRITERS]; for(int i = 1; i <= slowwriters.length; i++) { //create slow writers in different speed slowwriters[i - 1] = new SlowWriter(fs, new Path(dir, "file" + i), i*200L); } - for(SlowWriter s : slowwriters) { - s.start(); + for(int i = 0; i < FIRST_BATCH; i++) { + slowwriters[i].start(); } // Let slow writers write something. - // Some of them are too slow and will be not yet started. - sleepSeconds(1); + // Some of them are too slow and will be not yet started. + sleepSeconds(3); //start new datanodes cluster.startDataNodes(conf, 2, true, null, new String[]{RACK1, RACK1}); + cluster.waitActive(); + // wait for first block reports for up to 10 seconds + cluster.waitFirstBRCompleted(0, 10000); + //stop an old datanode - cluster.stopDataNode(AppendTestUtil.nextInt(REPLICATION)); - - //Let the slow writer writes a few more seconds - //Everyone should have written something. - sleepSeconds(5); + MiniDFSCluster.DataNodeProperties dnprop = cluster.stopDataNode( + AppendTestUtil.nextInt(REPLICATION)); + + for(int i = FIRST_BATCH; i < slowwriters.length; i++) { + slowwriters[i].start(); + } + + waitForBlockReplication(slowwriters); //check replication and interrupt. for(SlowWriter s : slowwriters) { @@ -181,6 +195,26 @@ public void testReplaceDatanodeOnFailure() throws Exception { } } + void waitForBlockReplication(final SlowWriter[] slowwriters) throws + TimeoutException, InterruptedException { + GenericTestUtils.waitFor(new Supplier() { + @Override public Boolean get() { + try { + for (SlowWriter s : slowwriters) { + if (s.out.getCurrentBlockReplication() < REPLICATION) { + return false; + } + } + } catch (IOException e) { + LOG.warn("IOException is thrown while getting the file block " + + "replication factor", e); + return false; + } + return true; + } + }, 1000, 10000); + } + static void sleepSeconds(final int waittime) throws InterruptedException { LOG.info("Wait " + waittime + " seconds"); Thread.sleep(waittime * 1000L); @@ -191,7 +225,7 @@ static class SlowWriter extends Thread { final HdfsDataOutputStream out; final long sleepms; private volatile boolean running = true; - + SlowWriter(DistributedFileSystem fs, Path filepath, final long sleepms ) throws IOException { super(SlowWriter.class.getSimpleName() + ":" + filepath); @@ -203,12 +237,14 @@ static class SlowWriter extends Thread { @Override public void run() { int i = 0; + try { sleep(sleepms); for(; running; i++) { LOG.info(getName() + " writes " + i); out.write(i); out.hflush(); + sleep(sleepms); } } catch(InterruptedException e) {