diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt index 4d96857e72..4893c54829 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt @@ -220,3 +220,5 @@ HDFS-2586. Add protobuf service and implementation for HAServiceProtocol. (sures HDFS-2952. NN should not start with upgrade option or with a pending an unfinalized upgrade. (atm) HDFS-2974. MiniDFSCluster does not delete standby NN name dirs during format. (atm) + +HDFS-2929. Stress test and fixes for block synchronization (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 76371918e5..b13041b3ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -1804,6 +1804,13 @@ public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock, long newLength) throws IOException { ReplicaInfo r = data.updateReplicaUnderRecovery(oldBlock, recoveryId, newLength); + // Notify the namenode of the updated block info. This is important + // for HA, since otherwise the standby node may lose track of the + // block locations until the next block report. + ExtendedBlock newBlock = new ExtendedBlock(oldBlock); + newBlock.setGenerationStamp(recoveryId); + newBlock.setNumBytes(newLength); + notifyNamenodeReceivedBlock(newBlock, ""); return new ExtendedBlock(oldBlock.getBlockPoolId(), r); } @@ -1930,7 +1937,6 @@ void syncBlock(RecoveringBlock rBlock, // or their replicas have 0 length. // The block can be deleted. if (syncList.isEmpty()) { - // TODO: how does this work in HA?? nn.commitBlockSynchronization(block, recoveryId, 0, true, true, DatanodeID.EMPTY_ARRAY); return; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 8edc4bc88a..823ce8bce9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2826,12 +2826,9 @@ void commitBlockSynchronization(ExtendedBlock lastblock, writeLock(); try { checkOperation(OperationCategory.WRITE); - if (haContext.getState().equals(NameNode.STANDBY_STATE)) { - // TODO(HA) we'll never get here, since we check for WRITE operation above! - // Need to implement tests, etc, for this - block recovery spanning - // failover. - } - + // If a DN tries to commit to the standby, the recovery will + // fail, and the next retry will succeed on the new NN. + if (isInSafeMode()) { throw new SafeModeException( "Cannot commitBlockSynchronization while in safe mode", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java new file mode 100644 index 0000000000..39667eddf1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HAStressTestHarness.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode.ha; + +import java.io.IOException; +import java.net.URISyntaxException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread; +import org.apache.hadoop.test.MultithreadedTestUtil.TestContext; + +/** + * Utility class to start an HA cluster, and then start threads + * to periodically fail back and forth, accelerate block deletion + * processing, etc. + */ +public class HAStressTestHarness { + Configuration conf; + private MiniDFSCluster cluster; + static final int BLOCK_SIZE = 1024; + TestContext testCtx = new TestContext(); + + public HAStressTestHarness() { + conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + // Increase max streams so that we re-replicate quickly. + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000); + } + + /** + * Start and return the MiniDFSCluster. + */ + public MiniDFSCluster startCluster() throws IOException { + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(3) + .build(); + return cluster; + } + + /** + * Return a filesystem with client-failover configured for the + * cluster. + */ + public FileSystem getFailoverFs() throws IOException, URISyntaxException { + return HATestUtil.configureFailoverFs(cluster, conf); + } + + /** + * Add a thread which periodically triggers deletion reports, + * heartbeats, and NN-side block work. + * @param interval millisecond period on which to run + */ + public void addReplicationTriggerThread(final int interval) { + + testCtx.addThread(new RepeatingTestThread(testCtx) { + + @Override + public void doAnAction() throws Exception { + for (DataNode dn : cluster.getDataNodes()) { + DataNodeAdapter.triggerDeletionReport(dn); + DataNodeAdapter.triggerHeartbeat(dn); + } + for (int i = 0; i < 2; i++) { + NameNode nn = cluster.getNameNode(i); + BlockManagerTestUtil.computeAllPendingWork( + nn.getNamesystem().getBlockManager()); + } + Thread.sleep(interval); + } + }); + } + + /** + * Add a thread which periodically triggers failover back and forth between + * the two namenodes. + */ + public void addFailoverThread(final int msBetweenFailovers) { + testCtx.addThread(new RepeatingTestThread(testCtx) { + + @Override + public void doAnAction() throws Exception { + System.err.println("==============================\n" + + "Failing over from 0->1\n" + + "=================================="); + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + + Thread.sleep(msBetweenFailovers); + System.err.println("==============================\n" + + "Failing over from 1->0\n" + + "=================================="); + + cluster.transitionToStandby(1); + cluster.transitionToActive(0); + Thread.sleep(msBetweenFailovers); + } + }); + } + + /** + * Start all of the threads which have been added. + */ + public void startThreads() { + this.testCtx.startThreads(); + } + + /** + * Stop threads, propagating any exceptions that might have been thrown. + */ + public void stopThreads() throws Exception { + this.testCtx.stop(); + } + + /** + * Shutdown the minicluster, as well as any of the running threads. + */ + public void shutdown() throws Exception { + this.testCtx.stop(); + if (cluster != null) { + this.cluster.shutdown(); + cluster = null; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java index 44bc01d1cd..95d5eb941e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencingWithReplication.java @@ -22,19 +22,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.MiniDFSNNTopology; -import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread; @@ -111,28 +105,16 @@ public String toString() { @Test public void testFencingStress() throws Exception { - Configuration conf = new Configuration(); - conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); - conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000); - conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); - conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); - // Increase max streams so that we re-replicate quickly. - conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000); + HAStressTestHarness harness = new HAStressTestHarness(); + harness.conf.setInt( + DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000); - - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .nnTopology(MiniDFSNNTopology.simpleHATopology()) - .numDataNodes(3) - .build(); + final MiniDFSCluster cluster = harness.startCluster(); try { cluster.waitActive(); cluster.transitionToActive(0); - final NameNode nn1 = cluster.getNameNode(0); - final NameNode nn2 = cluster.getNameNode(1); - - FileSystem fs = HATestUtil.configureFailoverFs( - cluster, conf); + FileSystem fs = harness.getFailoverFs(); TestContext togglers = new TestContext(); for (int i = 0; i < NUM_THREADS; i++) { Path p = new Path("/test-" + i); @@ -143,51 +125,14 @@ public void testFencingStress() throws Exception { // Start a separate thread which will make sure that replication // happens quickly by triggering deletion reports and replication // work calculation frequently. - TestContext triggerCtx = new TestContext(); - triggerCtx.addThread(new RepeatingTestThread(triggerCtx) { - - @Override - public void doAnAction() throws Exception { - for (DataNode dn : cluster.getDataNodes()) { - DataNodeAdapter.triggerDeletionReport(dn); - DataNodeAdapter.triggerHeartbeat(dn); - } - for (int i = 0; i < 2; i++) { - NameNode nn = cluster.getNameNode(i); - BlockManagerTestUtil.computeAllPendingWork( - nn.getNamesystem().getBlockManager()); - } - Thread.sleep(500); - } - }); - - triggerCtx.addThread(new RepeatingTestThread(triggerCtx) { - - @Override - public void doAnAction() throws Exception { - System.err.println("==============================\n" + - "Failing over from 0->1\n" + - "=================================="); - cluster.transitionToStandby(0); - cluster.transitionToActive(1); - - Thread.sleep(5000); - System.err.println("==============================\n" + - "Failing over from 1->0\n" + - "=================================="); - - cluster.transitionToStandby(1); - cluster.transitionToActive(0); - Thread.sleep(5000); - } - }); - - triggerCtx.startThreads(); + harness.addReplicationTriggerThread(500); + harness.addFailoverThread(5000); + harness.startThreads(); togglers.startThreads(); togglers.waitFor(RUNTIME); togglers.stop(); - triggerCtx.stop(); + harness.stopThreads(); // CHeck that the files can be read without throwing for (int i = 0; i < NUM_THREADS; i++) { @@ -196,7 +141,7 @@ public void doAnAction() throws Exception { } } finally { System.err.println("===========================\n\n\n\n"); - cluster.shutdown(); + harness.shutdown(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java index ce7347cdf0..465987c6cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPipelinesFailover.java @@ -18,9 +18,10 @@ package org.apache.hadoop.hdfs.server.namenode.ha; import static org.junit.Assert.*; -import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,19 +32,35 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.AppendTestUtil; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; -import org.apache.hadoop.hdfs.TestDFSClientFailover; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; +import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread; +import org.apache.hadoop.test.MultithreadedTestUtil.TestContext; + import org.apache.log4j.Level; -import org.junit.Ignore; + import org.junit.Test; +import org.mockito.Mockito; + +import com.google.common.base.Supplier; /** * Test cases regarding pipeline recovery during NN failover. @@ -64,6 +81,9 @@ public class TestPipelinesFailover { new Path("/test-file"); private static final int BLOCK_SIZE = 4096; private static final int BLOCK_AND_A_HALF = BLOCK_SIZE * 3 / 2; + + private static final int STRESS_NUM_THREADS = 25; + private static final int STRESS_RUNTIME = 40000; /** * Tests continuing a write pipeline over a failover. @@ -216,22 +236,271 @@ public void testLeaseRecoveryAfterFailover() throws Exception { cluster.transitionToActive(1); assertTrue(fs.exists(TEST_PATH)); - - FileSystem fsOtherUser = UserGroupInformation.createUserForTesting( - "otheruser", new String[] { "othergroup"}) - .doAs(new PrivilegedExceptionAction() { - @Override - public FileSystem run() throws Exception { - return HATestUtil.configureFailoverFs(cluster, conf); - } - }); - ((DistributedFileSystem)fsOtherUser).recoverLease(TEST_PATH); + + FileSystem fsOtherUser = createFsAsOtherUser(cluster, conf); + loopRecoverLease(fsOtherUser, TEST_PATH); AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF); + + // Fail back to ensure that the block locations weren't lost on the + // original node. + cluster.transitionToStandby(1); + cluster.transitionToActive(0); + AppendTestUtil.check(fs, TEST_PATH, BLOCK_AND_A_HALF); } finally { IOUtils.closeStream(stm); cluster.shutdown(); } } + /** + * Test the scenario where the NN fails over after issuing a block + * synchronization request, but before it is committed. The + * DN running the recovery should then fail to commit the synchronization + * and a later retry will succeed. + */ + @Test(timeout=30000) + public void testFailoverRightBeforeCommitSynchronization() throws Exception { + final Configuration conf = new Configuration(); + // Disable permissions so that another user can recover the lease. + conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + + FSDataOutputStream stm = null; + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(3) + .build(); + try { + cluster.waitActive(); + cluster.transitionToActive(0); + Thread.sleep(500); + + LOG.info("Starting with NN 0 active"); + FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); + stm = fs.create(TEST_PATH); + + // write a half block + AppendTestUtil.write(stm, 0, BLOCK_SIZE / 2); + stm.hflush(); + + // Look into the block manager on the active node for the block + // under construction. + + NameNode nn0 = cluster.getNameNode(0); + ExtendedBlock blk = DFSTestUtil.getFirstBlock(fs, TEST_PATH); + DatanodeDescriptor expectedPrimary = getExpectedPrimaryNode(nn0, blk); + LOG.info("Expecting block recovery to be triggered on DN " + + expectedPrimary); + + // Find the corresponding DN daemon, and spy on its connection to the + // active. + DataNode primaryDN = cluster.getDataNode(expectedPrimary.getIpcPort()); + DatanodeProtocolClientSideTranslatorPB nnSpy = + DataNodeAdapter.spyOnBposToNN(primaryDN, nn0); + + // Delay the commitBlockSynchronization call + DelayAnswer delayer = new DelayAnswer(LOG); + Mockito.doAnswer(delayer).when(nnSpy).commitBlockSynchronization( + Mockito.eq(blk), + Mockito.anyInt(), // new genstamp + Mockito.anyLong(), // new length + Mockito.eq(true), // close file + Mockito.eq(false), // delete block + (DatanodeID[]) Mockito.anyObject()); // new targets + + DistributedFileSystem fsOtherUser = createFsAsOtherUser(cluster, conf); + assertFalse(fsOtherUser.recoverLease(TEST_PATH)); + + LOG.info("Waiting for commitBlockSynchronization call from primary"); + delayer.waitForCall(); + + LOG.info("Failing over to NN 1"); + + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + + // Let the commitBlockSynchronization call go through, and check that + // it failed with the correct exception. + delayer.proceed(); + delayer.waitForResult(); + Throwable t = delayer.getThrown(); + if (t == null) { + fail("commitBlockSynchronization call did not fail on standby"); + } + GenericTestUtils.assertExceptionContains( + "Operation category WRITE is not supported", + t); + + // Now, if we try again to recover the block, it should succeed on the new + // active. + loopRecoverLease(fsOtherUser, TEST_PATH); + + AppendTestUtil.check(fs, TEST_PATH, BLOCK_SIZE/2); + } finally { + IOUtils.closeStream(stm); + cluster.shutdown(); + } + } + + /** + * Stress test for pipeline/lease recovery. Starts a number of + * threads, each of which creates a file and has another client + * break the lease. While these threads run, failover proceeds + * back and forth between two namenodes. + */ + @Test(timeout=STRESS_RUNTIME*3) + public void testPipelineRecoveryStress() throws Exception { + HAStressTestHarness harness = new HAStressTestHarness(); + // Disable permissions so that another user can recover the lease. + harness.conf.setBoolean( + DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false); + + final MiniDFSCluster cluster = harness.startCluster(); + try { + cluster.waitActive(); + cluster.transitionToActive(0); + + FileSystem fs = harness.getFailoverFs(); + DistributedFileSystem fsAsOtherUser = createFsAsOtherUser( + cluster, harness.conf); + + TestContext testers = new TestContext(); + for (int i = 0; i < STRESS_NUM_THREADS; i++) { + Path p = new Path("/test-" + i); + testers.addThread(new PipelineTestThread( + testers, fs, fsAsOtherUser, p)); + } + + // Start a separate thread which will make sure that replication + // happens quickly by triggering deletion reports and replication + // work calculation frequently. + harness.addReplicationTriggerThread(500); + harness.addFailoverThread(5000); + harness.startThreads(); + testers.startThreads(); + + testers.waitFor(STRESS_RUNTIME); + testers.stop(); + harness.stopThreads(); + } finally { + System.err.println("===========================\n\n\n\n"); + harness.shutdown(); + } + } + + /** + * Test thread which creates a file, has another fake user recover + * the lease on the file, and then ensures that the file's contents + * are properly readable. If any of these steps fails, propagates + * an exception back to the test context, causing the test case + * to fail. + */ + private static class PipelineTestThread extends RepeatingTestThread { + private final FileSystem fs; + private final FileSystem fsOtherUser; + private final Path path; + + + public PipelineTestThread(TestContext ctx, + FileSystem fs, FileSystem fsOtherUser, Path p) { + super(ctx); + this.fs = fs; + this.fsOtherUser = fsOtherUser; + this.path = p; + } + + @Override + public void doAnAction() throws Exception { + FSDataOutputStream stm = fs.create(path, true); + try { + AppendTestUtil.write(stm, 0, 100); + stm.hflush(); + loopRecoverLease(fsOtherUser, path); + AppendTestUtil.check(fs, path, 100); + } finally { + try { + stm.close(); + } catch (IOException e) { + // should expect this since we lost the lease + } + } + } + + @Override + public String toString() { + return "Pipeline test thread for " + path; + } + } + + + + /** + * @return the node which is expected to run the recovery of the + * given block, which is known to be under construction inside the + * given NameNOde. + */ + private DatanodeDescriptor getExpectedPrimaryNode(NameNode nn, + ExtendedBlock blk) { + BlockManager bm0 = nn.getNamesystem().getBlockManager(); + BlockInfo storedBlock = bm0.getStoredBlock(blk.getLocalBlock()); + assertTrue("Block " + blk + " should be under construction, " + + "got: " + storedBlock, + storedBlock instanceof BlockInfoUnderConstruction); + BlockInfoUnderConstruction ucBlock = + (BlockInfoUnderConstruction)storedBlock; + // We expect that the first indexed replica will be the one + // to be in charge of the synchronization / recovery protocol. + DatanodeDescriptor expectedPrimary = ucBlock.getExpectedLocations()[0]; + return expectedPrimary; + } + + private DistributedFileSystem createFsAsOtherUser( + final MiniDFSCluster cluster, final Configuration conf) + throws IOException, InterruptedException { + return (DistributedFileSystem) UserGroupInformation.createUserForTesting( + "otheruser", new String[] { "othergroup"}) + .doAs(new PrivilegedExceptionAction() { + @Override + public FileSystem run() throws Exception { + return HATestUtil.configureFailoverFs( + cluster, conf); + } + }); + } + + /** + * Try to cover the lease on the given file for up to 30 + * seconds. + * @param fsOtherUser the filesystem to use for the recoverLease call + * @param testPath the path on which to run lease recovery + * @throws TimeoutException if lease recover does not succeed within 30 + * seconds + * @throws InterruptedException if the thread is interrupted + */ + private static void loopRecoverLease( + final FileSystem fsOtherUser, final Path testPath) + throws TimeoutException, InterruptedException { + try { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + boolean success; + try { + success = ((DistributedFileSystem)fsOtherUser) + .recoverLease(testPath); + } catch (IOException e) { + throw new RuntimeException(e); + } + if (!success) { + LOG.info("Waiting to recover lease successfully"); + } + return success; + } + }, 1000, 30000); + } catch (TimeoutException e) { + throw new TimeoutException("Timed out recovering lease for " + + testPath); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java index f723a85bf4..23d1bb13a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/test/GenericTestUtils.java @@ -110,7 +110,11 @@ public static class DelayAnswer implements Answer { private final CountDownLatch fireLatch = new CountDownLatch(1); private final CountDownLatch waitLatch = new CountDownLatch(1); - + private final CountDownLatch resultLatch = new CountDownLatch(1); + + // Result fields set after proceed() is called. + private volatile Throwable thrown; + private volatile Object returnValue; public DelayAnswer(Log log) { this.LOG = log; @@ -145,7 +149,40 @@ public Object answer(InvocationOnMock invocation) throws Throwable { } protected Object passThrough(InvocationOnMock invocation) throws Throwable { - return invocation.callRealMethod(); + try { + Object ret = invocation.callRealMethod(); + returnValue = ret; + return ret; + } catch (Throwable t) { + thrown = t; + throw t; + } finally { + resultLatch.countDown(); + } + } + + /** + * After calling proceed(), this will wait until the call has + * completed and a result has been returned to the caller. + */ + public void waitForResult() throws InterruptedException { + resultLatch.await(); + } + + /** + * After the call has gone through, return any exception that + * was thrown, or null if no exception was thrown. + */ + public Throwable getThrown() { + return thrown; + } + + /** + * After the call has gone through, return the call's return value, + * or null in case it was void or an exception was thrown. + */ + public Object getReturnValue() { + return returnValue; } }