From 240cba7e6d8fa528a48ff04766b5b3c9b23a173e Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Mon, 18 May 2020 09:22:15 -0700 Subject: [PATCH] HDFS-15202 Boost short circuit cache (rebase PR-1884) (#2016) (cherry picked from commit 2abcf7762ae74b936e1cedb60d5d2b4cc4ee86ea) --- .../org/apache/hadoop/hdfs/ClientContext.java | 20 +- .../hdfs/client/HdfsClientConfigKeys.java | 2 + .../hdfs/client/impl/BlockReaderFactory.java | 9 +- .../hdfs/client/impl/DfsClientConf.java | 21 +- .../src/main/resources/hdfs-default.xml | 10 + .../fs/TestEnhancedByteBufferAccess.java | 6 +- .../client/impl/TestBlockReaderFactory.java | 14 +- .../client/impl/TestBlockReaderLocal.java | 187 +++++++++++++----- .../shortcircuit/TestShortCircuitCache.java | 19 +- 9 files changed, 213 insertions(+), 75 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index cbd941b6b9..7a03240e80 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -77,7 +77,7 @@ public class ClientContext { /** * Caches short-circuit file descriptors, mmap regions. */ - private final ShortCircuitCache shortCircuitCache; + private final ShortCircuitCache[] shortCircuitCache; /** * Caches TCP and UNIX domain sockets for reuse. @@ -132,13 +132,23 @@ public class ClientContext { */ private DeadNodeDetector deadNodeDetector = null; + /** + * ShortCircuitCache array size. + */ + private final int clientShortCircuitNum; + private ClientContext(String name, DfsClientConf conf, Configuration config) { final ShortCircuitConf scConf = conf.getShortCircuitConf(); this.name = name; this.confString = scConf.confAsString(); - this.shortCircuitCache = ShortCircuitCache.fromConf(scConf); + this.clientShortCircuitNum = conf.getClientShortCircuitNum(); + this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum]; + for (int i = 0; i < this.clientShortCircuitNum; i++) { + this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf); + } + this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(), scConf.getSocketCacheExpiry()); this.keyProviderCache = new KeyProviderCache( @@ -228,7 +238,11 @@ public String getConfString() { } public ShortCircuitCache getShortCircuitCache() { - return shortCircuitCache; + return shortCircuitCache[0]; + } + + public ShortCircuitCache getShortCircuitCache(long idx) { + return shortCircuitCache[(int) (idx % clientShortCircuitNum)]; } public PeerCache getPeerCache() { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index efc2766e9e..0c35c8d85e 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -144,6 +144,8 @@ public interface HdfsClientConfigKeys { "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms"; int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = 60000; + String DFS_CLIENT_SHORT_CIRCUIT_NUM = "dfs.client.short.circuit.num"; + int DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT = 1; String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY = "dfs.client.slow.io.warning.threshold.ms"; long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java index a3b611c1ca..028d6296e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java @@ -476,7 +476,8 @@ private BlockReader getBlockReaderLocal() throws IOException { "giving up on BlockReaderLocal.", this, pathInfo); return null; } - ShortCircuitCache cache = clientContext.getShortCircuitCache(); + ShortCircuitCache cache = + clientContext.getShortCircuitCache(block.getBlockId()); ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); @@ -527,7 +528,8 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { if (curPeer.fromCache) remainingCacheTries--; DomainPeer peer = (DomainPeer)curPeer.peer; Slot slot = null; - ShortCircuitCache cache = clientContext.getShortCircuitCache(); + ShortCircuitCache cache = + clientContext.getShortCircuitCache(block.getBlockId()); try { MutableBoolean usedPeer = new MutableBoolean(false); slot = cache.allocShmSlot(datanode, peer, usedPeer, @@ -582,7 +584,8 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { */ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, Slot slot) throws IOException { - ShortCircuitCache cache = clientContext.getShortCircuitCache(); + ShortCircuitCache cache = + clientContext.getShortCircuitCache(block.getBlockId()); final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE)); SlotId slotId = slot == null ? null : slot.getSlotId(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 918fef7e50..e41b608b5b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -142,6 +142,7 @@ public class DfsClientConf { private final long refreshReadBlockLocationsMS; private final ShortCircuitConf shortCircuitConf; + private final int clientShortCircuitNum; private final long hedgedReadThresholdMillis; private final int hedgedReadThreadpoolSize; @@ -272,8 +273,6 @@ public DfsClientConf(Configuration conf) { HdfsClientConfigKeys. DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT); - shortCircuitConf = new ShortCircuitConf(conf); - hedgedReadThresholdMillis = conf.getLong( HedgedRead.THRESHOLD_MILLIS_KEY, HedgedRead.THRESHOLD_MILLIS_DEFAULT); @@ -296,6 +295,17 @@ public DfsClientConf(Configuration conf) { leaseHardLimitPeriod = conf.getLong(HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_KEY, HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT) * 1000; + + shortCircuitConf = new ShortCircuitConf(conf); + clientShortCircuitNum = conf.getInt( + HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM, + HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT); + Preconditions.checkArgument(clientShortCircuitNum >= 1, + HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM + + "can't be less then 1."); + Preconditions.checkArgument(clientShortCircuitNum <= 5, + HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM + + "can't be more then 5."); } @SuppressWarnings("unchecked") @@ -601,6 +611,13 @@ public long getSlowIoWarningThresholdMs() { return slowIoWarningThresholdMs; } + /* + * @return the clientShortCircuitNum + */ + public int getClientShortCircuitNum() { + return clientShortCircuitNum; + } + /** * @return the hedgedReadThresholdMillis */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 00b4587ae3..7c42e0d879 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4188,6 +4188,16 @@ + + dfs.client.short.circuit.num + 1 + + Number of short-circuit caches. This setting should + be in the range 1 - 5. Lower values will result in lower CPU consumption; higher + values may speed up massive parallel reading files. + + + dfs.client.read.striped.threadpool.size 18 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java index 90b4f11a66..19bc71111e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java @@ -358,7 +358,7 @@ public void testZeroCopyMmapCache() throws Exception { fsIn.close(); fsIn = fs.open(TEST_PATH); final ShortCircuitCache cache = ClientContext.get( - CONTEXT, conf).getShortCircuitCache(); + CONTEXT, conf).getShortCircuitCache(0); cache.accept(new CountingVisitor(0, 5, 5, 0)); results[0] = fsIn.read(null, BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); @@ -654,12 +654,12 @@ public void testZeroCopyReadOfCachedData() throws Exception { BLOCK_SIZE), byteBufferToArray(result2)); fsIn2.releaseBuffer(result2); fsIn2.close(); - + // check that the replica is anchored final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH); final ShortCircuitCache cache = ClientContext.get( - CONTEXT, conf).getShortCircuitCache(); + CONTEXT, conf).getShortCircuitCache(0); waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1); // Uncache the replica fs.removeCacheDirective(directiveId); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java index 6b04b14f49..8442449446 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java @@ -389,7 +389,7 @@ private void testShortCircuitCacheUnbufferWithDisableInterval( try (FSDataInputStream in = dfs.open(testFile)) { Assert.assertEquals(0, - dfs.getClient().getClientContext().getShortCircuitCache() + dfs.getClient().getClientContext().getShortCircuitCache(0) .getReplicaInfoMapSize()); final byte[] buf = new byte[testFileLen]; @@ -398,12 +398,12 @@ private void testShortCircuitCacheUnbufferWithDisableInterval( // Set cache size to 0 so the replica marked evictable by unbuffer // will be purged immediately. - dfs.getClient().getClientContext().getShortCircuitCache() + dfs.getClient().getClientContext().getShortCircuitCache(0) .setMaxTotalSize(0); LOG.info("Unbuffering"); in.unbuffer(); Assert.assertEquals(0, - dfs.getClient().getClientContext().getShortCircuitCache() + dfs.getClient().getClientContext().getShortCircuitCache(0) .getReplicaInfoMapSize()); DFSTestUtil.appendFile(dfs, testFile, "append more data"); @@ -432,7 +432,7 @@ private void validateReadResult(final DistributedFileSystem dfs, final int expectedScrRepMapSize) { Assert.assertThat(expected, CoreMatchers.is(actual)); Assert.assertEquals(expectedScrRepMapSize, - dfs.getClient().getClientContext().getShortCircuitCache() + dfs.getClient().getClientContext().getShortCircuitCache(0) .getReplicaInfoMapSize()); } @@ -467,7 +467,7 @@ public void testShortCircuitReadFromServerWithoutShm() throws Exception { calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); Assert.assertTrue(Arrays.equals(contents, expected)); final ShortCircuitCache cache = - fs.getClient().getClientContext().getShortCircuitCache(); + fs.getClient().getClientContext().getShortCircuitCache(0); final DatanodeInfo datanode = new DatanodeInfoBuilder() .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()) .build(); @@ -516,7 +516,7 @@ public void testShortCircuitReadFromClientWithoutShm() throws Exception { calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); Assert.assertTrue(Arrays.equals(contents, expected)); final ShortCircuitCache cache = - fs.getClient().getClientContext().getShortCircuitCache(); + fs.getClient().getClientContext().getShortCircuitCache(0); Assert.assertEquals(null, cache.getDfsClientShmManager()); cluster.shutdown(); sockDir.close(); @@ -548,7 +548,7 @@ public void testShortCircuitCacheShutdown() throws Exception { calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); Assert.assertTrue(Arrays.equals(contents, expected)); final ShortCircuitCache cache = - fs.getClient().getClientContext().getShortCircuitCache(); + fs.getClient().getClientContext().getShortCircuitCache(0); cache.close(); Assert.assertTrue(cache.getDfsClientShmManager(). getDomainSocketWatcher().isClosed()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java index 95fb67a1a4..534243ddf3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java @@ -116,7 +116,7 @@ private static void readFully(BlockReaderLocal reader, } private static class BlockReaderLocalTest { - final static int TEST_LENGTH = 12345; + final static int TEST_LENGTH = 1234567; final static int BYTES_PER_CHECKSUM = 512; public void setConfiguration(HdfsConfiguration conf) { @@ -130,10 +130,14 @@ public void doTest(BlockReaderLocal reader, byte original[]) throws IOException { // default: no-op } - } + public void doTest(BlockReaderLocal reader, byte[] original, int shift) + throws IOException { + // default: no-op + } } public void runBlockReaderLocalTest(BlockReaderLocalTest test, - boolean checksum, long readahead) throws IOException { + boolean checksum, long readahead, int shortCircuitCachesNum) + throws IOException { Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); MiniDFSCluster cluster = null; HdfsConfiguration conf = new HdfsConfiguration(); @@ -143,10 +147,13 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test, BlockReaderLocalTest.BYTES_PER_CHECKSUM); conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C"); conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead); + conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM, + shortCircuitCachesNum); test.setConfiguration(conf); FileInputStream dataIn = null, metaIn = null; final Path TEST_PATH = new Path("/a"); final long RANDOM_SEED = 4567L; + final int blockSize = 10 * 1024; BlockReaderLocal blockReaderLocal = null; FSDataInputStream fsIn = null; byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; @@ -158,8 +165,8 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test, cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); fs = cluster.getFileSystem(); - DFSTestUtil.createFile(fs, TEST_PATH, - BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED); + DFSTestUtil.createFile(fs, TEST_PATH, 1024, + BlockReaderLocalTest.TEST_LENGTH, blockSize, (short)1, RANDOM_SEED); try { DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); } catch (InterruptedException e) { @@ -174,47 +181,52 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test, BlockReaderLocalTest.TEST_LENGTH); fsIn.close(); fsIn = null; - ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH); - File dataFile = cluster.getBlockFile(0, block); - File metaFile = cluster.getBlockMetadataFile(0, block); + for (int i = 0; i < shortCircuitCachesNum; i++) { + ExtendedBlock block = DFSTestUtil.getAllBlocks( + fs, TEST_PATH).get(i).getBlock(); + File dataFile = cluster.getBlockFile(0, block); + File metaFile = cluster.getBlockMetadataFile(0, block); - ShortCircuitCache shortCircuitCache = - ClientContext.getFromConf(conf).getShortCircuitCache(); + ShortCircuitCache shortCircuitCache = + ClientContext.getFromConf(conf).getShortCircuitCache( + block.getBlockId()); + test.setup(dataFile, checksum); + FileInputStream[] streams = { + new FileInputStream(dataFile), + new FileInputStream(metaFile) + }; + dataIn = streams[0]; + metaIn = streams[1]; + ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), + block.getBlockPoolId()); + raf = new RandomAccessFile( + new File(sockDir.getDir().getAbsolutePath(), + UUID.randomUUID().toString()), "rw"); + raf.setLength(8192); + FileInputStream shmStream = new FileInputStream(raf.getFD()); + shm = new ShortCircuitShm(ShmId.createRandom(), shmStream); + ShortCircuitReplica replica = + new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache, + Time.now(), shm.allocAndRegisterSlot( + ExtendedBlockId.fromExtendedBlock(block))); + blockReaderLocal = new BlockReaderLocal.Builder( + new DfsClientConf.ShortCircuitConf(conf)). + setFilename(TEST_PATH.getName()). + setBlock(block). + setShortCircuitReplica(replica). + setCachingStrategy(new CachingStrategy(false, readahead)). + setVerifyChecksum(checksum). + build(); + dataIn = null; + metaIn = null; + test.doTest(blockReaderLocal, original, i * blockSize); + // BlockReaderLocal should not alter the file position. + Assert.assertEquals(0, streams[0].getChannel().position()); + Assert.assertEquals(0, streams[1].getChannel().position()); + } cluster.shutdown(); cluster = null; - test.setup(dataFile, checksum); - FileInputStream streams[] = { - new FileInputStream(dataFile), - new FileInputStream(metaFile) - }; - dataIn = streams[0]; - metaIn = streams[1]; - ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), - block.getBlockPoolId()); - raf = new RandomAccessFile( - new File(sockDir.getDir().getAbsolutePath(), - UUID.randomUUID().toString()), "rw"); - raf.setLength(8192); - FileInputStream shmStream = new FileInputStream(raf.getFD()); - shm = new ShortCircuitShm(ShmId.createRandom(), shmStream); - ShortCircuitReplica replica = - new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache, - Time.now(), shm.allocAndRegisterSlot( - ExtendedBlockId.fromExtendedBlock(block))); - blockReaderLocal = new BlockReaderLocal.Builder( - new DfsClientConf.ShortCircuitConf(conf)). - setFilename(TEST_PATH.getName()). - setBlock(block). - setShortCircuitReplica(replica). - setCachingStrategy(new CachingStrategy(false, readahead)). - setVerifyChecksum(checksum). - build(); - dataIn = null; - metaIn = null; - test.doTest(blockReaderLocal, original); - // BlockReaderLocal should not alter the file position. - Assert.assertEquals(0, streams[0].getChannel().position()); - Assert.assertEquals(0, streams[1].getChannel().position()); + } finally { if (fsIn != null) fsIn.close(); if (fs != null) fs.close(); @@ -227,6 +239,11 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test, } } + public void runBlockReaderLocalTest(BlockReaderLocalTest test, + boolean checksum, long readahead) throws IOException { + runBlockReaderLocalTest(test, checksum, readahead, 1); + } + private static class TestBlockReaderLocalImmediateClose extends BlockReaderLocalTest { } @@ -242,7 +259,7 @@ private static class TestBlockReaderSimpleReads @Override public void doTest(BlockReaderLocal reader, byte original[]) throws IOException { - byte buf[] = new byte[TEST_LENGTH]; + byte[] buf = new byte[TEST_LENGTH]; reader.readFully(buf, 0, 512); assertArrayRegionsEqual(original, 0, buf, 0, 512); reader.readFully(buf, 512, 512); @@ -291,7 +308,7 @@ private static class TestBlockReaderLocalArrayReads2 @Override public void doTest(BlockReaderLocal reader, byte original[]) throws IOException { - byte buf[] = new byte[TEST_LENGTH]; + byte[] buf = new byte[TEST_LENGTH]; reader.readFully(buf, 0, 10); assertArrayRegionsEqual(original, 0, buf, 0, 10); reader.readFully(buf, 10, 100); @@ -369,7 +386,7 @@ public void testBlockReaderLocalByteBufferReadsNoChecksum() public void testBlockReaderLocalByteBufferReadsNoReadahead() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), - true, 0); + true, 0); } @Test @@ -468,7 +485,7 @@ public void setup(File blockFile, boolean usingChecksums) public void doTest(BlockReaderLocal reader, byte original[]) throws IOException { - byte buf[] = new byte[TEST_LENGTH]; + byte[] buf = new byte[TEST_LENGTH]; if (usingChecksums) { try { reader.readFully(buf, 0, 10); @@ -508,7 +525,7 @@ public void setup(File blockFile, boolean usingChecksums) public void doTest(BlockReaderLocal reader, byte original[]) throws IOException { - byte buf[] = new byte[TEST_LENGTH]; + byte[] buf = new byte[TEST_LENGTH]; try { reader.readFully(buf, 0, 10); assertArrayRegionsEqual(original, 0, buf, 0, 10); @@ -845,4 +862,78 @@ public void testStatisticsForErasureCodingRead() throws IOException { } } } + + private static class TestBlockReaderFiveShortCircutCachesReads + extends BlockReaderLocalTest { + @Override + public void doTest(BlockReaderLocal reader, byte[] original, int shift) + throws IOException { + byte[] buf = new byte[TEST_LENGTH]; + reader.readFully(buf, 0, 512); + assertArrayRegionsEqual(original, shift, buf, 0, 512); + reader.readFully(buf, 512, 512); + assertArrayRegionsEqual(original, 512 + shift, buf, 512, 512); + reader.readFully(buf, 1024, 513); + assertArrayRegionsEqual(original, 1024 + shift, buf, 1024, 513); + reader.readFully(buf, 1537, 514); + assertArrayRegionsEqual(original, 1537 + shift, buf, 1537, 514); + // Readahead is always at least the size of one chunk in this test. + Assert.assertTrue(reader.getMaxReadaheadLength() >= + BlockReaderLocalTest.BYTES_PER_CHECKSUM); + } + } + + @Test + public void testBlockReaderFiveShortCircutCachesReads() throws IOException { + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), + true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, + 5); + } + + @Test + public void testBlockReaderFiveShortCircutCachesReadsShortReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), + true, BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1, + 5); + } + + @Test + public void testBlockReaderFiveShortCircutCachesReadsNoChecksum() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), + false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, + 5); + } + + @Test + public void testBlockReaderFiveShortCircutCachesReadsNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), + true, 0, 5); + } + + @Test + public void testBlockReaderFiveShortCircutCachesReadsNoChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), + false, 0, 5); + } + + @Test(expected = IllegalArgumentException.class) + public void testBlockReaderShortCircutCachesOutOfRangeBelow() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), + true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, + 0); + } + + @Test(expected = IllegalArgumentException.class) + public void testBlockReaderShortCircutCachesOutOfRangeAbove() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), + true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, + 555); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index b2da6a2fca..53cac2adee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -431,7 +431,7 @@ public void testAllocShm() throws Exception { cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); final ShortCircuitCache cache = - fs.getClient().getClientContext().getShortCircuitCache(); + fs.getClient().getClientContext().getShortCircuitCache(0); cache.getDfsClientShmManager().visit(new Visitor() { @Override public void visit(HashMap info) @@ -501,7 +501,7 @@ public void testShmBasedStaleness() throws Exception { cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); final ShortCircuitCache cache = - fs.getClient().getClientContext().getShortCircuitCache(); + fs.getClient().getClientContext().getShortCircuitCache(0); String TEST_FILE = "/test_file"; final int TEST_FILE_LEN = 8193; final int SEED = 0xFADED; @@ -565,7 +565,7 @@ public void testUnlinkingReplicasInFileDescriptorCache() throws Exception { cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); final ShortCircuitCache cache = - fs.getClient().getClientContext().getShortCircuitCache(); + fs.getClient().getClientContext().getShortCircuitCache(0); cache.getDfsClientShmManager().visit(new Visitor() { @Override public void visit(HashMap info) @@ -877,19 +877,20 @@ public void testRequestFileDescriptorsWhenULimit() throws Exception { return peerCache; }); - Mockito.when(clientContext.getShortCircuitCache()).thenAnswer( + Mockito.when(clientContext.getShortCircuitCache( + blk.getBlock().getBlockId())).thenAnswer( (Answer) shortCircuitCacheCall -> { - ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class); - Mockito.when(cache.allocShmSlot( + ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class); + Mockito.when(cache.allocShmSlot( Mockito.any(DatanodeInfo.class), Mockito.any(DomainPeer.class), Mockito.any(MutableBoolean.class), Mockito.any(ExtendedBlockId.class), Mockito.anyString())) - .thenAnswer((Answer) call -> null); + .thenAnswer((Answer) call -> null); - return cache; - } + return cache; + } ); DatanodeInfo[] nodes = blk.getLocations();