diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 21e2a3b518..c1cda5735e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -623,6 +623,7 @@ private void offerService() throws Exception { // while (shouldRun()) { try { + DataNodeFaultInjector.get().startOfferService(); final long startTime = scheduler.monotonicNow(); // @@ -725,6 +726,8 @@ private void offerService() throws Exception { } catch (IOException e) { LOG.warn("IOException in offerService", e); sleepAfterException(); + } finally { + DataNodeFaultInjector.get().endOfferService(); } processQueueMessages(); } // while (shouldRun()) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java index b74d2c9008..d2d557f1c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java @@ -30,7 +30,7 @@ @VisibleForTesting @InterfaceAudience.Private public class DataNodeFaultInjector { - public static DataNodeFaultInjector instance = new DataNodeFaultInjector(); + private static DataNodeFaultInjector instance = new DataNodeFaultInjector(); public static DataNodeFaultInjector get() { return instance; @@ -81,4 +81,8 @@ public void failMirrorConnection() throws IOException { } public void failPipeline(ReplicaInPipeline replicaInfo, String mirrorAddr) throws IOException { } + + public void startOfferService() throws Exception {} + + public void endOfferService() throws Exception {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index a40425926e..9abc19dcbe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -32,6 +32,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.Lists; +import net.jcip.annotations.NotThreadSafe; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -58,6 +59,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; +@NotThreadSafe public class TestDataNodeMetrics { private static final Log LOG = LogFactory.getLog(TestDataNodeMetrics.class); @@ -216,6 +218,7 @@ public void testTimeoutMetric() throws Exception { new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); final List streams = Lists.newArrayList(); + DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get(); try { final FSDataOutputStream out = cluster.getFileSystem().create(path, (short) 2); @@ -224,7 +227,7 @@ public void testTimeoutMetric() throws Exception { Mockito.doThrow(new IOException("mock IOException")). when(injector). writeBlockAfterFlush(); - DataNodeFaultInjector.instance = injector; + DataNodeFaultInjector.set(injector); streams.add(out); out.writeBytes("old gs data\n"); out.hflush(); @@ -250,7 +253,7 @@ public void testTimeoutMetric() throws Exception { if (cluster != null) { cluster.shutdown(); } - DataNodeFaultInjector.instance = new DataNodeFaultInjector(); + DataNodeFaultInjector.set(oldInjector); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index 28bf13bb39..2dbd5b9bd0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import net.jcip.annotations.NotThreadSafe; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; @@ -34,6 +35,8 @@ import java.util.HashSet; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -80,8 +83,10 @@ import org.apache.hadoop.test.MetricsAsserts; import org.apache.log4j.Logger; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -91,6 +96,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY; +@NotThreadSafe public class TestFsDatasetCache { private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class); @@ -110,13 +116,39 @@ public class TestFsDatasetCache { private static DataNode dn; private static FsDatasetSpi fsd; private static DatanodeProtocolClientSideTranslatorPB spyNN; + /** + * Used to pause DN BPServiceActor threads. BPSA threads acquire the + * shared read lock. The test acquires the write lock for exclusive access. + */ + private static ReadWriteLock lock = new ReentrantReadWriteLock(true); private static final PageRounder rounder = new PageRounder(); private static CacheManipulator prevCacheManipulator; + private static DataNodeFaultInjector oldInjector; static { LogManager.getLogger(FsDatasetCache.class).setLevel(Level.DEBUG); } + @BeforeClass + public static void setUpClass() throws Exception { + oldInjector = DataNodeFaultInjector.get(); + DataNodeFaultInjector.set(new DataNodeFaultInjector() { + @Override + public void startOfferService() throws Exception { + lock.readLock().lock(); + } + @Override + public void endOfferService() throws Exception { + lock.readLock().unlock(); + } + }); + } + + @AfterClass + public static void tearDownClass() throws Exception { + DataNodeFaultInjector.set(oldInjector); + } + @Before public void setUp() throws Exception { conf = new HdfsConfiguration(); @@ -143,7 +175,6 @@ public void setUp() throws Exception { fsd = dn.getFSDataset(); spyNN = InternalDataNodeTestUtils.spyOnBposToNN(dn, nn); - } @After @@ -164,18 +195,23 @@ public void tearDown() throws Exception { } private static void setHeartbeatResponse(DatanodeCommand[] cmds) - throws IOException { - NNHAStatusHeartbeat ha = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, - fsImage.getLastAppliedOrWrittenTxId()); - HeartbeatResponse response = - new HeartbeatResponse(cmds, ha, null, - ThreadLocalRandom.current().nextLong() | 1L); - doReturn(response).when(spyNN).sendHeartbeat( - (DatanodeRegistration) any(), - (StorageReport[]) any(), anyLong(), anyLong(), - anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(), - anyBoolean(), any(SlowPeerReports.class), - any(SlowDiskReports.class)); + throws Exception { + lock.writeLock().lock(); + try { + NNHAStatusHeartbeat ha = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, + fsImage.getLastAppliedOrWrittenTxId()); + HeartbeatResponse response = + new HeartbeatResponse(cmds, ha, null, + ThreadLocalRandom.current().nextLong() | 1L); + doReturn(response).when(spyNN).sendHeartbeat( + (DatanodeRegistration) any(), + (StorageReport[]) any(), anyLong(), anyLong(), + anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(), + anyBoolean(), any(SlowPeerReports.class), + any(SlowDiskReports.class)); + } finally { + lock.writeLock().unlock(); + } } private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index 06c6cf64d9..7ba0edcecc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.concurrent.TimeoutException; +import net.jcip.annotations.NotThreadSafe; import org.apache.commons.collections.map.LinkedMap; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; @@ -81,6 +82,7 @@ import com.google.common.base.Supplier; import com.google.common.collect.HashMultimap; +@NotThreadSafe public class TestShortCircuitCache { static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class); @@ -723,8 +725,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable { throw new IOException("injected error into sendShmResponse"); } }).when(failureInjector).sendShortCircuitShmResponse(); - DataNodeFaultInjector prevInjector = DataNodeFaultInjector.instance; - DataNodeFaultInjector.instance = failureInjector; + DataNodeFaultInjector prevInjector = DataNodeFaultInjector.get(); + DataNodeFaultInjector.set(failureInjector); try { // The first read will try to allocate a shared memory segment and slot. @@ -741,7 +743,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { cluster.getDataNodes().get(0).getShortCircuitRegistry()); LOG.info("Clearing failure injector and performing another read..."); - DataNodeFaultInjector.instance = prevInjector; + DataNodeFaultInjector.set(prevInjector); fs.getClient().getClientContext().getDomainSocketFactory().clearPathMap();