diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index d6c47b5f92..38c2435aa1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -127,6 +127,9 @@ public interface HdfsClientConfigKeys { boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false; String DFS_DOMAIN_SOCKET_PATH_KEY = "dfs.domain.socket.path"; String DFS_DOMAIN_SOCKET_PATH_DEFAULT = ""; + String DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY = + "dfs.domain.socket.disable.interval.seconds"; + long DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_DEFAULT = 600; String DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS = "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms"; int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java index 6f3fc611a7..60dde82fff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.client.impl; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION; import java.io.BufferedOutputStream; @@ -644,10 +645,17 @@ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, LOG.debug("{}:{}", this, msg); return new ShortCircuitReplicaInfo(new InvalidToken(msg)); default: - LOG.warn(this + ": unknown response code " + resp.getStatus() + - " while attempting to set up short-circuit access. " + - resp.getMessage() + ". Disabling short-circuit read for DataNode " - + datanode + " temporarily."); + final long expiration = + clientContext.getDomainSocketFactory().getPathExpireSeconds(); + String disableMsg = "disabled temporarily for " + expiration + " seconds"; + if (expiration == 0) { + disableMsg = "not disabled"; + } + LOG.warn("{}: unknown response code {} while attempting to set up " + + "short-circuit access. {}. Short-circuit read for " + + "DataNode {} is {} based on {}.", + this, resp.getStatus(), resp.getMessage(), datanode, + disableMsg, DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY); clientContext.getDomainSocketFactory() .disableShortCircuitForPath(pathInfo.getPath()); return null; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 332abb5e44..2703617812 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -68,6 +68,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT; @@ -602,6 +604,7 @@ public static class ShortCircuitConf { private final long shortCircuitMmapCacheExpiryMs; private final long shortCircuitMmapCacheRetryTimeout; private final long shortCircuitCacheStaleThresholdMs; + private final long domainSocketDisableIntervalSeconds; private final long keyProviderCacheExpiryMs; @@ -679,6 +682,11 @@ public ShortCircuitConf(Configuration conf) { shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt( DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT); + domainSocketDisableIntervalSeconds = conf.getLong( + DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY, + DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_DEFAULT); + Preconditions.checkArgument(domainSocketDisableIntervalSeconds >= 0, + DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY + "can't be negative."); keyProviderCacheExpiryMs = conf.getLong( DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_MS, @@ -793,6 +801,13 @@ public long getShortCircuitCacheStaleThresholdMs() { return shortCircuitCacheStaleThresholdMs; } + /** + * @return the domainSocketDisableIntervalSeconds + */ + public long getDomainSocketDisableIntervalSeconds() { + return domainSocketDisableIntervalSeconds; + } + /** * @return the keyProviderCacheExpiryMs */ @@ -827,7 +842,9 @@ public String confAsString() { + ", shortCircuitSharedMemoryWatcherInterruptCheckMs = " + shortCircuitSharedMemoryWatcherInterruptCheckMs + ", keyProviderCacheExpiryMs = " - + keyProviderCacheExpiryMs; + + keyProviderCacheExpiryMs + + ", domainSocketDisableIntervalSeconds = " + + domainSocketDisableIntervalSeconds; } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java index 57baee112e..25d80fa79f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DomainSocketFactory.java @@ -92,10 +92,8 @@ public String toString() { /** * Information about domain socket paths. */ - final Cache pathMap = - CacheBuilder.newBuilder() - .expireAfterWrite(10, TimeUnit.MINUTES) - .build(); + private final long pathExpireSeconds; + private final Cache pathMap; public DomainSocketFactory(ShortCircuitConf conf) { final String feature; @@ -121,6 +119,10 @@ public DomainSocketFactory(ShortCircuitConf conf) { LOG.debug(feature + " is enabled."); } } + + pathExpireSeconds = conf.getDomainSocketDisableIntervalSeconds(); + pathMap = CacheBuilder.newBuilder() + .expireAfterWrite(pathExpireSeconds, TimeUnit.SECONDS).build(); } /** @@ -192,4 +194,8 @@ public void disableDomainSocketPath(String path) { public void clearPathMap() { pathMap.invalidateAll(); } + + public long getPathExpireSeconds() { + return pathExpireSeconds; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java index bd02a9775c..b26652b0be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java @@ -278,7 +278,7 @@ public interface ShortCircuitReplicaCreator { * Maximum total size of the cache, including both mmapped and * no$-mmapped elements. */ - private final int maxTotalSize; + private int maxTotalSize; /** * Non-mmaped elements older than this will be closed. @@ -369,6 +369,11 @@ public long getStaleThresholdMs() { return staleThresholdMs; } + @VisibleForTesting + public void setMaxTotalSize(int maxTotalSize) { + this.maxTotalSize = maxTotalSize; + } + /** * Increment the reference count of a replica, and remove it from any free * list it may be in. @@ -1025,4 +1030,13 @@ public void scheduleSlotReleaser(Slot slot) { public DfsClientShmManager getDfsClientShmManager() { return shmManager; } + + /** + * Can be used in testing to verify whether a read went through SCR, after + * the read is done and before the stream is closed. + */ + @VisibleForTesting + public int getReplicaInfoMapSize() { + return replicaInfoMap.size(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 7446766605..f30644ccc2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2603,6 +2603,17 @@ + + dfs.domain.socket.disable.interval.seconds + 600 + + The interval that a DataNode is disabled for future Short-Circuit Reads, + after an error happens during a Short-Circuit Read. Setting this to 0 will + not disable Short-Circuit Reads at all after errors happen. Negative values + are invalid. + + + dfs.client.read.shortcircuit.skip.checksum false diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java index 2d0454910c..42a7310644 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java @@ -22,6 +22,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY; import static org.hamcrest.CoreMatchers.equalTo; import java.io.File; @@ -36,9 +38,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockReader; @@ -54,18 +55,29 @@ import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.Visitor; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.apache.hadoop.test.GenericTestUtils; +import org.hamcrest.CoreMatchers; import org.junit.After; import org.junit.Assert; import org.junit.Assume; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestBlockReaderFactory { - static final Log LOG = LogFactory.getLog(TestBlockReaderFactory.class); + static final Logger LOG = + LoggerFactory.getLogger(TestBlockReaderFactory.class); + + @Rule + public final Timeout globalTimeout = new Timeout(180000); @Before public void init() { @@ -209,7 +221,7 @@ public void run() { * occurs); however, the failure result should not be cached. We want * to be able to retry later and succeed. */ - @Test(timeout=60000) + @Test public void testShortCircuitCacheTemporaryFailure() throws Exception { BlockReaderTestUtil.enableBlockReaderFactoryTracing(); @@ -302,7 +314,96 @@ public void run() { Assert.assertFalse(testFailed.get()); } - /** + /** + * Test that by default, reads after a failure does not go through SCR. + */ + @Test + public void testShortCircuitCacheUnbufferDefault() throws Exception { + testShortCircuitCacheUnbufferWithDisableInterval( + DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_DEFAULT, true); + } + /** + * Test the case where if we disable the cache in + * {@link org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory}, reads + * after a failure still goes through SCR. + */ + @Test + public void testShortCircuitCacheUnbufferDisabled() throws Exception { + testShortCircuitCacheUnbufferWithDisableInterval(0, false); + } + + private void testShortCircuitCacheUnbufferWithDisableInterval( + final long interval, final boolean disabled) throws Exception { + final String testName = GenericTestUtils.getMethodName(); + BlockReaderTestUtil.enableBlockReaderFactoryTracing(); + try (TemporarySocketDirectory sockDir = new TemporarySocketDirectory()) { + Configuration conf = createShortCircuitConf(testName, sockDir); + conf.set(DFS_CLIENT_CONTEXT, testName + interval + disabled); + conf.setLong(DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY, interval); + Configuration serverConf = new Configuration(conf); + MiniDFSCluster.Builder builder = + new MiniDFSCluster.Builder(serverConf).numDataNodes(1); + try (MiniDFSCluster cluster = builder.build(); + DistributedFileSystem dfs = (DistributedFileSystem) FileSystem + .get(cluster.getURI(0), conf)) { + cluster.waitActive(); + final Path testFile = new Path("/test_file"); + final int testFileLen = 4000; + final int seed = 0xFADED; + DFSTestUtil.createFile(dfs, testFile, testFileLen, (short) 1, seed); + final byte[] expected = DFSTestUtil. + calculateFileContentsFromSeed(seed, testFileLen); + + try (FSDataInputStream in = dfs.open(testFile)) { + Assert.assertEquals(0, + dfs.getClient().getClientContext().getShortCircuitCache() + .getReplicaInfoMapSize()); + + final byte[] buf = new byte[testFileLen]; + IOUtils.readFully(in, buf, 0, testFileLen); + validateReadResult(dfs, expected, buf, 1); + + // Set cache size to 0 so the replica marked evictable by unbuffer + // will be purged immediately. + dfs.getClient().getClientContext().getShortCircuitCache() + .setMaxTotalSize(0); + LOG.info("Unbuffering"); + in.unbuffer(); + Assert.assertEquals(0, + dfs.getClient().getClientContext().getShortCircuitCache() + .getReplicaInfoMapSize()); + + DFSTestUtil.appendFile(dfs, testFile, "append more data"); + + // This read will force a new replica read via TCP. + Arrays.fill(buf, (byte) 0); + in.seek(0); + IOUtils.readFully(in, buf, 0, testFileLen); + validateReadResult(dfs, expected, buf, 0); + } + + LOG.info("Reading {} again.", testFile); + try (FSDataInputStream in = dfs.open(testFile)) { + final byte[] buf = new byte[testFileLen]; + Arrays.fill(buf, (byte) 0); + IOUtils.readFully(in, buf, 0, testFileLen); + final int expectedMapSize = disabled ? 0 : 1; + validateReadResult(dfs, expected, buf, expectedMapSize); + } + } + } + } + + private void validateReadResult(final DistributedFileSystem dfs, + final byte[] expected, final byte[] actual, + final int expectedScrRepMapSize) { + Assert.assertThat(expected, CoreMatchers.is(actual)); + Assert.assertEquals(expectedScrRepMapSize, + dfs.getClient().getClientContext().getShortCircuitCache() + .getReplicaInfoMapSize()); + } + + /** * Test that a client which supports short-circuit reads using * shared memory can fall back to not using shared memory when * the server doesn't support it.