HDFS-2929. Stress test and fixes for block synchronization. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1292494 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-02-22 20:31:52 +00:00
parent c78f6aa299
commit c14912785d
7 changed files with 493 additions and 87 deletions

View File

@ -220,3 +220,5 @@ HDFS-2586. Add protobuf service and implementation for HAServiceProtocol. (sures
HDFS-2952. NN should not start with upgrade option or with a pending an unfinalized upgrade. (atm) HDFS-2952. NN should not start with upgrade option or with a pending an unfinalized upgrade. (atm)
HDFS-2974. MiniDFSCluster does not delete standby NN name dirs during format. (atm) HDFS-2974. MiniDFSCluster does not delete standby NN name dirs during format. (atm)
HDFS-2929. Stress test and fixes for block synchronization (todd)

View File

@ -1804,6 +1804,13 @@ public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long newLength) throws IOException { long newLength) throws IOException {
ReplicaInfo r = data.updateReplicaUnderRecovery(oldBlock, ReplicaInfo r = data.updateReplicaUnderRecovery(oldBlock,
recoveryId, newLength); recoveryId, newLength);
// Notify the namenode of the updated block info. This is important
// for HA, since otherwise the standby node may lose track of the
// block locations until the next block report.
ExtendedBlock newBlock = new ExtendedBlock(oldBlock);
newBlock.setGenerationStamp(recoveryId);
newBlock.setNumBytes(newLength);
notifyNamenodeReceivedBlock(newBlock, "");
return new ExtendedBlock(oldBlock.getBlockPoolId(), r); return new ExtendedBlock(oldBlock.getBlockPoolId(), r);
} }
@ -1930,7 +1937,6 @@ void syncBlock(RecoveringBlock rBlock,
// or their replicas have 0 length. // or their replicas have 0 length.
// The block can be deleted. // The block can be deleted.
if (syncList.isEmpty()) { if (syncList.isEmpty()) {
// TODO: how does this work in HA??
nn.commitBlockSynchronization(block, recoveryId, 0, nn.commitBlockSynchronization(block, recoveryId, 0,
true, true, DatanodeID.EMPTY_ARRAY); true, true, DatanodeID.EMPTY_ARRAY);
return; return;

View File

@ -2826,12 +2826,9 @@ void commitBlockSynchronization(ExtendedBlock lastblock,
writeLock(); writeLock();
try { try {
checkOperation(OperationCategory.WRITE); checkOperation(OperationCategory.WRITE);
if (haContext.getState().equals(NameNode.STANDBY_STATE)) { // If a DN tries to commit to the standby, the recovery will
// TODO(HA) we'll never get here, since we check for WRITE operation above! // fail, and the next retry will succeed on the new NN.
// Need to implement tests, etc, for this - block recovery spanning
// failover.
}
if (isInSafeMode()) { if (isInSafeMode()) {
throw new SafeModeException( throw new SafeModeException(
"Cannot commitBlockSynchronization while in safe mode", "Cannot commitBlockSynchronization while in safe mode",

View File

@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
/**
* Utility class to start an HA cluster, and then start threads
* to periodically fail back and forth, accelerate block deletion
* processing, etc.
*/
public class HAStressTestHarness {
Configuration conf;
private MiniDFSCluster cluster;
static final int BLOCK_SIZE = 1024;
TestContext testCtx = new TestContext();
public HAStressTestHarness() {
conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
// Increase max streams so that we re-replicate quickly.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000);
}
/**
* Start and return the MiniDFSCluster.
*/
public MiniDFSCluster startCluster() throws IOException {
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(3)
.build();
return cluster;
}
/**
* Return a filesystem with client-failover configured for the
* cluster.
*/
public FileSystem getFailoverFs() throws IOException, URISyntaxException {
return HATestUtil.configureFailoverFs(cluster, conf);
}
/**
* Add a thread which periodically triggers deletion reports,
* heartbeats, and NN-side block work.
* @param interval millisecond period on which to run
*/
public void addReplicationTriggerThread(final int interval) {
testCtx.addThread(new RepeatingTestThread(testCtx) {
@Override
public void doAnAction() throws Exception {
for (DataNode dn : cluster.getDataNodes()) {
DataNodeAdapter.triggerDeletionReport(dn);
DataNodeAdapter.triggerHeartbeat(dn);
}
for (int i = 0; i < 2; i++) {
NameNode nn = cluster.getNameNode(i);
BlockManagerTestUtil.computeAllPendingWork(
nn.getNamesystem().getBlockManager());
}
Thread.sleep(interval);
}
});
}
/**
* Add a thread which periodically triggers failover back and forth between
* the two namenodes.
*/
public void addFailoverThread(final int msBetweenFailovers) {
testCtx.addThread(new RepeatingTestThread(testCtx) {
@Override
public void doAnAction() throws Exception {
System.err.println("==============================\n" +
"Failing over from 0->1\n" +
"==================================");
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
Thread.sleep(msBetweenFailovers);
System.err.println("==============================\n" +
"Failing over from 1->0\n" +
"==================================");
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
Thread.sleep(msBetweenFailovers);
}
});
}
/**
* Start all of the threads which have been added.
*/
public void startThreads() {
this.testCtx.startThreads();
}
/**
* Stop threads, propagating any exceptions that might have been thrown.
*/
public void stopThreads() throws Exception {
this.testCtx.stop();
}
/**
* Shutdown the minicluster, as well as any of the running threads.
*/
public void shutdown() throws Exception {
this.testCtx.stop();
if (cluster != null) {
this.cluster.shutdown();
cluster = null;
}
}
}

View File

@ -22,19 +22,13 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread; import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
@ -111,28 +105,16 @@ public String toString() {
@Test @Test
public void testFencingStress() throws Exception { public void testFencingStress() throws Exception {
Configuration conf = new Configuration(); HAStressTestHarness harness = new HAStressTestHarness();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); harness.conf.setInt(
conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000); DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
// Increase max streams so that we re-replicate quickly.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000);
final MiniDFSCluster cluster = harness.startCluster();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(3)
.build();
try { try {
cluster.waitActive(); cluster.waitActive();
cluster.transitionToActive(0); cluster.transitionToActive(0);
final NameNode nn1 = cluster.getNameNode(0); FileSystem fs = harness.getFailoverFs();
final NameNode nn2 = cluster.getNameNode(1);
FileSystem fs = HATestUtil.configureFailoverFs(
cluster, conf);
TestContext togglers = new TestContext(); TestContext togglers = new TestContext();
for (int i = 0; i < NUM_THREADS; i++) { for (int i = 0; i < NUM_THREADS; i++) {
Path p = new Path("/test-" + i); Path p = new Path("/test-" + i);
@ -143,51 +125,14 @@ public void testFencingStress() throws Exception {
// Start a separate thread which will make sure that replication // Start a separate thread which will make sure that replication
// happens quickly by triggering deletion reports and replication // happens quickly by triggering deletion reports and replication
// work calculation frequently. // work calculation frequently.
TestContext triggerCtx = new TestContext(); harness.addReplicationTriggerThread(500);
triggerCtx.addThread(new RepeatingTestThread(triggerCtx) { harness.addFailoverThread(5000);
harness.startThreads();
@Override
public void doAnAction() throws Exception {
for (DataNode dn : cluster.getDataNodes()) {
DataNodeAdapter.triggerDeletionReport(dn);
DataNodeAdapter.triggerHeartbeat(dn);
}
for (int i = 0; i < 2; i++) {
NameNode nn = cluster.getNameNode(i);
BlockManagerTestUtil.computeAllPendingWork(
nn.getNamesystem().getBlockManager());
}
Thread.sleep(500);
}
});
triggerCtx.addThread(new RepeatingTestThread(triggerCtx) {
@Override
public void doAnAction() throws Exception {
System.err.println("==============================\n" +
"Failing over from 0->1\n" +
"==================================");
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
Thread.sleep(5000);
System.err.println("==============================\n" +
"Failing over from 1->0\n" +
"==================================");
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
Thread.sleep(5000);
}
});
triggerCtx.startThreads();
togglers.startThreads(); togglers.startThreads();
togglers.waitFor(RUNTIME); togglers.waitFor(RUNTIME);
togglers.stop(); togglers.stop();
triggerCtx.stop(); harness.stopThreads();
// CHeck that the files can be read without throwing // CHeck that the files can be read without throwing
for (int i = 0; i < NUM_THREADS; i++) { for (int i = 0; i < NUM_THREADS; i++) {
@ -196,7 +141,7 @@ public void doAnAction() throws Exception {
} }
} finally { } finally {
System.err.println("===========================\n\n\n\n"); System.err.println("===========================\n\n\n\n");
cluster.shutdown(); harness.shutdown();
} }
} }

View File

@ -18,9 +18,10 @@
package org.apache.hadoop.hdfs.server.namenode.ha; package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -31,19 +32,35 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil; import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.TestDFSClientFailover; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import com.google.common.base.Supplier;
/** /**
* Test cases regarding pipeline recovery during NN failover. * Test cases regarding pipeline recovery during NN failover.
@ -64,6 +81,9 @@ public class TestPipelinesFailover {
new Path("/test-file"); new Path("/test-file");
private static final int BLOCK_SIZE = 4096; private static final int BLOCK_SIZE = 4096;
private static final int BLOCK_AND_A_HALF = BLOCK_SIZE * 3 / 2; private static final int BLOCK_AND_A_HALF = BLOCK_SIZE * 3 / 2;
private static final int STRESS_NUM_THREADS = 25;
private static final int STRESS_RUNTIME = 40000;
/** /**
* Tests continuing a write pipeline over a failover. * Tests continuing a write pipeline over a failover.
@ -216,22 +236,271 @@ public void testLeaseRecoveryAfterFailover() throws Exception {
cluster.transitionToActive(1); cluster.transitionToActive(1);
assertTrue(fs.exists(TEST_PATH)); assertTrue(fs.exists(TEST_PATH));
FileSystem fsOtherUser = UserGroupInformation.createUserForTesting( FileSystem fsOtherUser = createFsAsOtherUser(cluster, conf);
"otheruser", new String[] { "othergroup"}) loopRecoverLease(fsOtherUser, TEST_PATH);
.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return HATestUtil.configureFailoverFs(cluster, conf);
}
});
((DistributedFileSystem)fsOtherUser).recoverLease(TEST_PATH);
AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF); AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF);
// Fail back to ensure that the block locations weren't lost on the
// original node.
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF);
} finally { } finally {
IOUtils.closeStream(stm); IOUtils.closeStream(stm);
cluster.shutdown(); cluster.shutdown();
} }
} }
/**
* Test the scenario where the NN fails over after issuing a block
* synchronization request, but before it is committed. The
* DN running the recovery should then fail to commit the synchronization
* and a later retry will succeed.
*/
@Test(timeout=30000)
public void testFailoverRightBeforeCommitSynchronization() throws Exception {
final Configuration conf = new Configuration();
// Disable permissions so that another user can recover the lease.
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
FSDataOutputStream stm = null;
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(3)
.build();
try {
cluster.waitActive();
cluster.transitionToActive(0);
Thread.sleep(500);
LOG.info("Starting with NN 0 active");
FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf);
stm = fs.create(TEST_PATH);
// write a half block
AppendTestUtil.write(stm, 0, BLOCK_SIZE / 2);
stm.hflush();
// Look into the block manager on the active node for the block
// under construction.
NameNode nn0 = cluster.getNameNode(0);
ExtendedBlock blk = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
DatanodeDescriptor expectedPrimary = getExpectedPrimaryNode(nn0, blk);
LOG.info("Expecting block recovery to be triggered on DN " +
expectedPrimary);
// Find the corresponding DN daemon, and spy on its connection to the
// active.
DataNode primaryDN = cluster.getDataNode(expectedPrimary.getIpcPort());
DatanodeProtocolClientSideTranslatorPB nnSpy =
DataNodeAdapter.spyOnBposToNN(primaryDN, nn0);
// Delay the commitBlockSynchronization call
DelayAnswer delayer = new DelayAnswer(LOG);
Mockito.doAnswer(delayer).when(nnSpy).commitBlockSynchronization(
Mockito.eq(blk),
Mockito.anyInt(), // new genstamp
Mockito.anyLong(), // new length
Mockito.eq(true), // close file
Mockito.eq(false), // delete block
(DatanodeID[]) Mockito.anyObject()); // new targets
DistributedFileSystem fsOtherUser = createFsAsOtherUser(cluster, conf);
assertFalse(fsOtherUser.recoverLease(TEST_PATH));
LOG.info("Waiting for commitBlockSynchronization call from primary");
delayer.waitForCall();
LOG.info("Failing over to NN 1");
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
// Let the commitBlockSynchronization call go through, and check that
// it failed with the correct exception.
delayer.proceed();
delayer.waitForResult();
Throwable t = delayer.getThrown();
if (t == null) {
fail("commitBlockSynchronization call did not fail on standby");
}
GenericTestUtils.assertExceptionContains(
"Operation category WRITE is not supported",
t);
// Now, if we try again to recover the block, it should succeed on the new
// active.
loopRecoverLease(fsOtherUser, TEST_PATH);
AppendTestUtil.check(fs, TEST_PATH, BLOCK_SIZE/2);
} finally {
IOUtils.closeStream(stm);
cluster.shutdown();
}
}
/**
* Stress test for pipeline/lease recovery. Starts a number of
* threads, each of which creates a file and has another client
* break the lease. While these threads run, failover proceeds
* back and forth between two namenodes.
*/
@Test(timeout=STRESS_RUNTIME*3)
public void testPipelineRecoveryStress() throws Exception {
HAStressTestHarness harness = new HAStressTestHarness();
// Disable permissions so that another user can recover the lease.
harness.conf.setBoolean(
DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false);
final MiniDFSCluster cluster = harness.startCluster();
try {
cluster.waitActive();
cluster.transitionToActive(0);
FileSystem fs = harness.getFailoverFs();
DistributedFileSystem fsAsOtherUser = createFsAsOtherUser(
cluster, harness.conf);
TestContext testers = new TestContext();
for (int i = 0; i < STRESS_NUM_THREADS; i++) {
Path p = new Path("/test-" + i);
testers.addThread(new PipelineTestThread(
testers, fs, fsAsOtherUser, p));
}
// Start a separate thread which will make sure that replication
// happens quickly by triggering deletion reports and replication
// work calculation frequently.
harness.addReplicationTriggerThread(500);
harness.addFailoverThread(5000);
harness.startThreads();
testers.startThreads();
testers.waitFor(STRESS_RUNTIME);
testers.stop();
harness.stopThreads();
} finally {
System.err.println("===========================\n\n\n\n");
harness.shutdown();
}
}
/**
* Test thread which creates a file, has another fake user recover
* the lease on the file, and then ensures that the file's contents
* are properly readable. If any of these steps fails, propagates
* an exception back to the test context, causing the test case
* to fail.
*/
private static class PipelineTestThread extends RepeatingTestThread {
private final FileSystem fs;
private final FileSystem fsOtherUser;
private final Path path;
public PipelineTestThread(TestContext ctx,
FileSystem fs, FileSystem fsOtherUser, Path p) {
super(ctx);
this.fs = fs;
this.fsOtherUser = fsOtherUser;
this.path = p;
}
@Override
public void doAnAction() throws Exception {
FSDataOutputStream stm = fs.create(path, true);
try {
AppendTestUtil.write(stm, 0, 100);
stm.hflush();
loopRecoverLease(fsOtherUser, path);
AppendTestUtil.check(fs, path, 100);
} finally {
try {
stm.close();
} catch (IOException e) {
// should expect this since we lost the lease
}
}
}
@Override
public String toString() {
return "Pipeline test thread for " + path;
}
}
/**
* @return the node which is expected to run the recovery of the
* given block, which is known to be under construction inside the
* given NameNOde.
*/
private DatanodeDescriptor getExpectedPrimaryNode(NameNode nn,
ExtendedBlock blk) {
BlockManager bm0 = nn.getNamesystem().getBlockManager();
BlockInfo storedBlock = bm0.getStoredBlock(blk.getLocalBlock());
assertTrue("Block " + blk + " should be under construction, " +
"got: " + storedBlock,
storedBlock instanceof BlockInfoUnderConstruction);
BlockInfoUnderConstruction ucBlock =
(BlockInfoUnderConstruction)storedBlock;
// We expect that the first indexed replica will be the one
// to be in charge of the synchronization / recovery protocol.
DatanodeDescriptor expectedPrimary = ucBlock.getExpectedLocations()[0];
return expectedPrimary;
}
private DistributedFileSystem createFsAsOtherUser(
final MiniDFSCluster cluster, final Configuration conf)
throws IOException, InterruptedException {
return (DistributedFileSystem) UserGroupInformation.createUserForTesting(
"otheruser", new String[] { "othergroup"})
.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
return HATestUtil.configureFailoverFs(
cluster, conf);
}
});
}
/**
* Try to cover the lease on the given file for up to 30
* seconds.
* @param fsOtherUser the filesystem to use for the recoverLease call
* @param testPath the path on which to run lease recovery
* @throws TimeoutException if lease recover does not succeed within 30
* seconds
* @throws InterruptedException if the thread is interrupted
*/
private static void loopRecoverLease(
final FileSystem fsOtherUser, final Path testPath)
throws TimeoutException, InterruptedException {
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
boolean success;
try {
success = ((DistributedFileSystem)fsOtherUser)
.recoverLease(testPath);
} catch (IOException e) {
throw new RuntimeException(e);
}
if (!success) {
LOG.info("Waiting to recover lease successfully");
}
return success;
}
}, 1000, 30000);
} catch (TimeoutException e) {
throw new TimeoutException("Timed out recovering lease for " +
testPath);
}
}
} }

View File

@ -110,7 +110,11 @@ public static class DelayAnswer implements Answer<Object> {
private final CountDownLatch fireLatch = new CountDownLatch(1); private final CountDownLatch fireLatch = new CountDownLatch(1);
private final CountDownLatch waitLatch = new CountDownLatch(1); private final CountDownLatch waitLatch = new CountDownLatch(1);
private final CountDownLatch resultLatch = new CountDownLatch(1);
// Result fields set after proceed() is called.
private volatile Throwable thrown;
private volatile Object returnValue;
public DelayAnswer(Log log) { public DelayAnswer(Log log) {
this.LOG = log; this.LOG = log;
@ -145,7 +149,40 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
} }
protected Object passThrough(InvocationOnMock invocation) throws Throwable { protected Object passThrough(InvocationOnMock invocation) throws Throwable {
return invocation.callRealMethod(); try {
Object ret = invocation.callRealMethod();
returnValue = ret;
return ret;
} catch (Throwable t) {
thrown = t;
throw t;
} finally {
resultLatch.countDown();
}
}
/**
* After calling proceed(), this will wait until the call has
* completed and a result has been returned to the caller.
*/
public void waitForResult() throws InterruptedException {
resultLatch.await();
}
/**
* After the call has gone through, return any exception that
* was thrown, or null if no exception was thrown.
*/
public Throwable getThrown() {
return thrown;
}
/**
* After the call has gone through, return the call's return value,
* or null in case it was void or an exception was thrown.
*/
public Object getReturnValue() {
return returnValue;
} }
} }