HDFS-16061. DFTestUtil.waitReplication can produce false positives (#3095). Contributed by Ahmed Hussein.

Reviewed-by: Jim Brennan <jbrennan@apache.org>
Signed-off-by: Ayush Saxena <ayushsaxena@apache.org>
This commit is contained in:
Ahmed Hussein 2021-06-19 23:52:47 -05:00 committed by Ayush Saxena
parent 8d241482bd
commit 92ade1f6f9
2 changed files with 35 additions and 24 deletions

View File

@ -799,41 +799,48 @@ public String[] getFileNames(String topDir) {
/** /**
* Wait for the given file to reach the given replication factor. * Wait for the given file to reach the given replication factor.
* @throws TimeoutException if we fail to sufficiently replicate the file *
* @param fs the defined filesystem.
* @param fileName being written.
* @param replFactor desired replication
* @throws IOException getting block locations
* @throws InterruptedException during sleep
* @throws TimeoutException if 40 seconds passed before reaching the desired
* replication.
*/ */
public static void waitReplication(FileSystem fs, Path fileName, short replFactor) public static void waitReplication(FileSystem fs, Path fileName,
short replFactor)
throws IOException, InterruptedException, TimeoutException { throws IOException, InterruptedException, TimeoutException {
boolean correctReplFactor; boolean correctReplFactor;
final int ATTEMPTS = 40; int attempt = 0;
int count = 0;
do { do {
correctReplFactor = true; correctReplFactor = true;
if (attempt++ > 0) {
Thread.sleep(1000);
}
BlockLocation locs[] = fs.getFileBlockLocations( BlockLocation locs[] = fs.getFileBlockLocations(
fs.getFileStatus(fileName), 0, Long.MAX_VALUE); fs.getFileStatus(fileName), 0, Long.MAX_VALUE);
count++; for (int currLoc = 0; currLoc < locs.length; currLoc++) {
for (int j = 0; j < locs.length; j++) { String[] hostnames = locs[currLoc].getNames();
String[] hostnames = locs[j].getNames();
if (hostnames.length != replFactor) { if (hostnames.length != replFactor) {
LOG.info(
"Block {} of file {} has replication factor {} "
+ "(desired {}); locations: {}",
currLoc, fileName, hostnames.length, replFactor,
Joiner.on(' ').join(hostnames));
correctReplFactor = false; correctReplFactor = false;
System.out.println("Block " + j + " of file " + fileName
+ " has replication factor " + hostnames.length
+ " (desired " + replFactor + "); locations "
+ Joiner.on(' ').join(hostnames));
Thread.sleep(1000);
break; break;
} }
} }
if (correctReplFactor) { } while (!correctReplFactor && attempt < 40);
System.out.println("All blocks of file " + fileName
+ " verified to have replication factor " + replFactor);
}
} while (!correctReplFactor && count < ATTEMPTS);
if (count == ATTEMPTS) { if (!correctReplFactor) {
throw new TimeoutException("Timed out waiting for " + fileName + throw new TimeoutException("Timed out waiting for file ["
" to reach " + replFactor + " replicas"); + fileName + "] to reach [" + replFactor + "] replicas");
} }
LOG.info("All blocks of file {} verified to have replication factor {}",
fileName, replFactor);
} }
/** delete directory and everything underneath it.*/ /** delete directory and everything underneath it.*/

View File

@ -20,13 +20,17 @@
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.Timeout;
/** /**
* The Balancer ensures that it disperses RPCs to the NameNode * The Balancer ensures that it disperses RPCs to the NameNode
* in order to avoid NN's RPC queue saturation. * in order to avoid NN's RPC queue saturation.
*/ */
public class TestBalancerRPCDelay { public class TestBalancerRPCDelay {
@Rule
public Timeout globalTimeout = Timeout.seconds(100);
private TestBalancer testBalancer; private TestBalancer testBalancer;
@ -43,12 +47,12 @@ public void teardown() throws Exception {
} }
} }
@Test(timeout=100000) @Test
public void testBalancerRPCDelayQps3() throws Exception { public void testBalancerRPCDelayQps3() throws Exception {
testBalancer.testBalancerRPCDelay(3); testBalancer.testBalancerRPCDelay(3);
} }
@Test(timeout=100000) @Test
public void testBalancerRPCDelayQpsDefault() throws Exception { public void testBalancerRPCDelayQpsDefault() throws Exception {
testBalancer.testBalancerRPCDelay( testBalancer.testBalancerRPCDelay(
DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT); DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT);