HDFS-15202 Boost short circuit cache (rebase PR-1884) (#2016)

Added parameter dfs.client.short.circuit.num improving HDFS-client's massive reading performance by create few instances ShortCircuit caches instead of one. It helps avoid locks and lets CPU do job.

(cherry picked from commit 86e6aa8eec)
This commit is contained in:
pustota2009 2020-05-18 17:04:04 +03:00 committed by Wei-Chiu Chuang
parent 77587ffb1e
commit ad9a6a0ee3
9 changed files with 214 additions and 75 deletions

View File

@ -77,7 +77,7 @@ public class ClientContext {
/** /**
* Caches short-circuit file descriptors, mmap regions. * Caches short-circuit file descriptors, mmap regions.
*/ */
private final ShortCircuitCache shortCircuitCache; private final ShortCircuitCache[] shortCircuitCache;
/** /**
* Caches TCP and UNIX domain sockets for reuse. * Caches TCP and UNIX domain sockets for reuse.
@ -132,13 +132,23 @@ public class ClientContext {
*/ */
private DeadNodeDetector deadNodeDetector = null; private DeadNodeDetector deadNodeDetector = null;
/**
* ShortCircuitCache array size.
*/
private final int clientShortCircuitNum;
private ClientContext(String name, DfsClientConf conf, private ClientContext(String name, DfsClientConf conf,
Configuration config) { Configuration config) {
final ShortCircuitConf scConf = conf.getShortCircuitConf(); final ShortCircuitConf scConf = conf.getShortCircuitConf();
this.name = name; this.name = name;
this.confString = scConf.confAsString(); this.confString = scConf.confAsString();
this.shortCircuitCache = ShortCircuitCache.fromConf(scConf); this.clientShortCircuitNum = conf.getClientShortCircuitNum();
this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
for (int i = 0; i < this.clientShortCircuitNum; i++) {
this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf);
}
this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(), this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
scConf.getSocketCacheExpiry()); scConf.getSocketCacheExpiry());
this.keyProviderCache = new KeyProviderCache( this.keyProviderCache = new KeyProviderCache(
@ -228,7 +238,11 @@ public String getConfString() {
} }
public ShortCircuitCache getShortCircuitCache() { public ShortCircuitCache getShortCircuitCache() {
return shortCircuitCache; return shortCircuitCache[0];
}
public ShortCircuitCache getShortCircuitCache(long idx) {
return shortCircuitCache[(int) (idx % clientShortCircuitNum)];
} }
public PeerCache getPeerCache() { public PeerCache getPeerCache() {

View File

@ -144,6 +144,8 @@ public interface HdfsClientConfigKeys {
"dfs.short.circuit.shared.memory.watcher.interrupt.check.ms"; "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT =
60000; 60000;
String DFS_CLIENT_SHORT_CIRCUIT_NUM = "dfs.client.short.circuit.num";
int DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT = 1;
String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY = String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY =
"dfs.client.slow.io.warning.threshold.ms"; "dfs.client.slow.io.warning.threshold.ms";
long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000; long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000;

View File

@ -476,7 +476,8 @@ private BlockReader getBlockReaderLocal() throws IOException {
"giving up on BlockReaderLocal.", this, pathInfo); "giving up on BlockReaderLocal.", this, pathInfo);
return null; return null;
} }
ShortCircuitCache cache = clientContext.getShortCircuitCache(); ShortCircuitCache cache =
clientContext.getShortCircuitCache(block.getBlockId());
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
block.getBlockPoolId()); block.getBlockPoolId());
ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
@ -527,7 +528,8 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
if (curPeer.fromCache) remainingCacheTries--; if (curPeer.fromCache) remainingCacheTries--;
DomainPeer peer = (DomainPeer)curPeer.peer; DomainPeer peer = (DomainPeer)curPeer.peer;
Slot slot = null; Slot slot = null;
ShortCircuitCache cache = clientContext.getShortCircuitCache(); ShortCircuitCache cache =
clientContext.getShortCircuitCache(block.getBlockId());
try { try {
MutableBoolean usedPeer = new MutableBoolean(false); MutableBoolean usedPeer = new MutableBoolean(false);
slot = cache.allocShmSlot(datanode, peer, usedPeer, slot = cache.allocShmSlot(datanode, peer, usedPeer,
@ -582,7 +584,8 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
*/ */
private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
Slot slot) throws IOException { Slot slot) throws IOException {
ShortCircuitCache cache = clientContext.getShortCircuitCache(); ShortCircuitCache cache =
clientContext.getShortCircuitCache(block.getBlockId());
final DataOutputStream out = final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE)); new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE));
SlotId slotId = slot == null ? null : slot.getSlotId(); SlotId slotId = slot == null ? null : slot.getSlotId();

View File

@ -142,6 +142,7 @@ public class DfsClientConf {
private final long refreshReadBlockLocationsMS; private final long refreshReadBlockLocationsMS;
private final ShortCircuitConf shortCircuitConf; private final ShortCircuitConf shortCircuitConf;
private final int clientShortCircuitNum;
private final long hedgedReadThresholdMillis; private final long hedgedReadThresholdMillis;
private final int hedgedReadThreadpoolSize; private final int hedgedReadThreadpoolSize;
@ -272,8 +273,6 @@ public DfsClientConf(Configuration conf) {
HdfsClientConfigKeys. HdfsClientConfigKeys.
DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT); DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT);
shortCircuitConf = new ShortCircuitConf(conf);
hedgedReadThresholdMillis = conf.getLong( hedgedReadThresholdMillis = conf.getLong(
HedgedRead.THRESHOLD_MILLIS_KEY, HedgedRead.THRESHOLD_MILLIS_KEY,
HedgedRead.THRESHOLD_MILLIS_DEFAULT); HedgedRead.THRESHOLD_MILLIS_DEFAULT);
@ -296,6 +295,17 @@ public DfsClientConf(Configuration conf) {
leaseHardLimitPeriod = leaseHardLimitPeriod =
conf.getLong(HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_KEY, conf.getLong(HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_KEY,
HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT) * 1000; HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT) * 1000;
shortCircuitConf = new ShortCircuitConf(conf);
clientShortCircuitNum = conf.getInt(
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM,
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT);
Preconditions.checkArgument(clientShortCircuitNum >= 1,
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
"can't be less then 1.");
Preconditions.checkArgument(clientShortCircuitNum <= 5,
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
"can't be more then 5.");
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -601,6 +611,13 @@ public long getSlowIoWarningThresholdMs() {
return slowIoWarningThresholdMs; return slowIoWarningThresholdMs;
} }
/*
* @return the clientShortCircuitNum
*/
public int getClientShortCircuitNum() {
return clientShortCircuitNum;
}
/** /**
* @return the hedgedReadThresholdMillis * @return the hedgedReadThresholdMillis
*/ */

View File

@ -4178,6 +4178,16 @@
</description> </description>
</property> </property>
<property>
<name>dfs.client.short.circuit.num</name>
<value>1</value>
<description>
Number of short-circuit caches. This setting should
be in the range 1 - 5. Lower values will result in lower CPU consumption; higher
values may speed up massive parallel reading files.
</description>
</property>
<property> <property>
<name>dfs.client.read.striped.threadpool.size</name> <name>dfs.client.read.striped.threadpool.size</name>
<value>18</value> <value>18</value>

View File

@ -358,7 +358,7 @@ public void testZeroCopyMmapCache() throws Exception {
fsIn.close(); fsIn.close();
fsIn = fs.open(TEST_PATH); fsIn = fs.open(TEST_PATH);
final ShortCircuitCache cache = ClientContext.get( final ShortCircuitCache cache = ClientContext.get(
CONTEXT, conf).getShortCircuitCache(); CONTEXT, conf).getShortCircuitCache(0);
cache.accept(new CountingVisitor(0, 5, 5, 0)); cache.accept(new CountingVisitor(0, 5, 5, 0));
results[0] = fsIn.read(null, BLOCK_SIZE, results[0] = fsIn.read(null, BLOCK_SIZE,
EnumSet.of(ReadOption.SKIP_CHECKSUMS)); EnumSet.of(ReadOption.SKIP_CHECKSUMS));
@ -659,7 +659,7 @@ public void testZeroCopyReadOfCachedData() throws Exception {
final ExtendedBlock firstBlock = final ExtendedBlock firstBlock =
DFSTestUtil.getFirstBlock(fs, TEST_PATH); DFSTestUtil.getFirstBlock(fs, TEST_PATH);
final ShortCircuitCache cache = ClientContext.get( final ShortCircuitCache cache = ClientContext.get(
CONTEXT, conf).getShortCircuitCache(); CONTEXT, conf).getShortCircuitCache(0);
waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1); waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
// Uncache the replica // Uncache the replica
fs.removeCacheDirective(directiveId); fs.removeCacheDirective(directiveId);

View File

@ -389,7 +389,7 @@ private void testShortCircuitCacheUnbufferWithDisableInterval(
try (FSDataInputStream in = dfs.open(testFile)) { try (FSDataInputStream in = dfs.open(testFile)) {
Assert.assertEquals(0, Assert.assertEquals(0,
dfs.getClient().getClientContext().getShortCircuitCache() dfs.getClient().getClientContext().getShortCircuitCache(0)
.getReplicaInfoMapSize()); .getReplicaInfoMapSize());
final byte[] buf = new byte[testFileLen]; final byte[] buf = new byte[testFileLen];
@ -398,12 +398,12 @@ private void testShortCircuitCacheUnbufferWithDisableInterval(
// Set cache size to 0 so the replica marked evictable by unbuffer // Set cache size to 0 so the replica marked evictable by unbuffer
// will be purged immediately. // will be purged immediately.
dfs.getClient().getClientContext().getShortCircuitCache() dfs.getClient().getClientContext().getShortCircuitCache(0)
.setMaxTotalSize(0); .setMaxTotalSize(0);
LOG.info("Unbuffering"); LOG.info("Unbuffering");
in.unbuffer(); in.unbuffer();
Assert.assertEquals(0, Assert.assertEquals(0,
dfs.getClient().getClientContext().getShortCircuitCache() dfs.getClient().getClientContext().getShortCircuitCache(0)
.getReplicaInfoMapSize()); .getReplicaInfoMapSize());
DFSTestUtil.appendFile(dfs, testFile, "append more data"); DFSTestUtil.appendFile(dfs, testFile, "append more data");
@ -432,7 +432,7 @@ private void validateReadResult(final DistributedFileSystem dfs,
final int expectedScrRepMapSize) { final int expectedScrRepMapSize) {
Assert.assertThat(expected, CoreMatchers.is(actual)); Assert.assertThat(expected, CoreMatchers.is(actual));
Assert.assertEquals(expectedScrRepMapSize, Assert.assertEquals(expectedScrRepMapSize,
dfs.getClient().getClientContext().getShortCircuitCache() dfs.getClient().getClientContext().getShortCircuitCache(0)
.getReplicaInfoMapSize()); .getReplicaInfoMapSize());
} }
@ -467,7 +467,7 @@ public void testShortCircuitReadFromServerWithoutShm() throws Exception {
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected)); Assert.assertTrue(Arrays.equals(contents, expected));
final ShortCircuitCache cache = final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache(); fs.getClient().getClientContext().getShortCircuitCache(0);
final DatanodeInfo datanode = new DatanodeInfoBuilder() final DatanodeInfo datanode = new DatanodeInfoBuilder()
.setNodeID(cluster.getDataNodes().get(0).getDatanodeId()) .setNodeID(cluster.getDataNodes().get(0).getDatanodeId())
.build(); .build();
@ -516,7 +516,7 @@ public void testShortCircuitReadFromClientWithoutShm() throws Exception {
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected)); Assert.assertTrue(Arrays.equals(contents, expected));
final ShortCircuitCache cache = final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache(); fs.getClient().getClientContext().getShortCircuitCache(0);
Assert.assertEquals(null, cache.getDfsClientShmManager()); Assert.assertEquals(null, cache.getDfsClientShmManager());
cluster.shutdown(); cluster.shutdown();
sockDir.close(); sockDir.close();
@ -548,7 +548,7 @@ public void testShortCircuitCacheShutdown() throws Exception {
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected)); Assert.assertTrue(Arrays.equals(contents, expected));
final ShortCircuitCache cache = final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache(); fs.getClient().getClientContext().getShortCircuitCache(0);
cache.close(); cache.close();
Assert.assertTrue(cache.getDfsClientShmManager(). Assert.assertTrue(cache.getDfsClientShmManager().
getDomainSocketWatcher().isClosed()); getDomainSocketWatcher().isClosed());

View File

@ -116,7 +116,7 @@ private static void readFully(BlockReaderLocal reader,
} }
private static class BlockReaderLocalTest { private static class BlockReaderLocalTest {
final static int TEST_LENGTH = 12345; final static int TEST_LENGTH = 1234567;
final static int BYTES_PER_CHECKSUM = 512; final static int BYTES_PER_CHECKSUM = 512;
public void setConfiguration(HdfsConfiguration conf) { public void setConfiguration(HdfsConfiguration conf) {
@ -130,10 +130,14 @@ public void doTest(BlockReaderLocal reader, byte original[])
throws IOException { throws IOException {
// default: no-op // default: no-op
} }
} public void doTest(BlockReaderLocal reader, byte[] original, int shift)
throws IOException {
// default: no-op
} }
public void runBlockReaderLocalTest(BlockReaderLocalTest test, public void runBlockReaderLocalTest(BlockReaderLocalTest test,
boolean checksum, long readahead) throws IOException { boolean checksum, long readahead, int shortCircuitCachesNum)
throws IOException {
Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null));
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
@ -143,10 +147,13 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
BlockReaderLocalTest.BYTES_PER_CHECKSUM); BlockReaderLocalTest.BYTES_PER_CHECKSUM);
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C"); conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C");
conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead); conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead);
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM,
shortCircuitCachesNum);
test.setConfiguration(conf); test.setConfiguration(conf);
FileInputStream dataIn = null, metaIn = null; FileInputStream dataIn = null, metaIn = null;
final Path TEST_PATH = new Path("/a"); final Path TEST_PATH = new Path("/a");
final long RANDOM_SEED = 4567L; final long RANDOM_SEED = 4567L;
final int blockSize = 10 * 1024;
BlockReaderLocal blockReaderLocal = null; BlockReaderLocal blockReaderLocal = null;
FSDataInputStream fsIn = null; FSDataInputStream fsIn = null;
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
@ -158,8 +165,8 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive(); cluster.waitActive();
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH, DFSTestUtil.createFile(fs, TEST_PATH, 1024,
BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED); BlockReaderLocalTest.TEST_LENGTH, blockSize, (short)1, RANDOM_SEED);
try { try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -174,47 +181,52 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
BlockReaderLocalTest.TEST_LENGTH); BlockReaderLocalTest.TEST_LENGTH);
fsIn.close(); fsIn.close();
fsIn = null; fsIn = null;
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH); for (int i = 0; i < shortCircuitCachesNum; i++) {
File dataFile = cluster.getBlockFile(0, block); ExtendedBlock block = DFSTestUtil.getAllBlocks(
File metaFile = cluster.getBlockMetadataFile(0, block); fs, TEST_PATH).get(i).getBlock();
File dataFile = cluster.getBlockFile(0, block);
File metaFile = cluster.getBlockMetadataFile(0, block);
ShortCircuitCache shortCircuitCache = ShortCircuitCache shortCircuitCache =
ClientContext.getFromConf(conf).getShortCircuitCache(); ClientContext.getFromConf(conf).getShortCircuitCache(
block.getBlockId());
test.setup(dataFile, checksum);
FileInputStream[] streams = {
new FileInputStream(dataFile),
new FileInputStream(metaFile)
};
dataIn = streams[0];
metaIn = streams[1];
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
block.getBlockPoolId());
raf = new RandomAccessFile(
new File(sockDir.getDir().getAbsolutePath(),
UUID.randomUUID().toString()), "rw");
raf.setLength(8192);
FileInputStream shmStream = new FileInputStream(raf.getFD());
shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
ShortCircuitReplica replica =
new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
Time.now(), shm.allocAndRegisterSlot(
ExtendedBlockId.fromExtendedBlock(block)));
blockReaderLocal = new BlockReaderLocal.Builder(
new DfsClientConf.ShortCircuitConf(conf)).
setFilename(TEST_PATH.getName()).
setBlock(block).
setShortCircuitReplica(replica).
setCachingStrategy(new CachingStrategy(false, readahead)).
setVerifyChecksum(checksum).
build();
dataIn = null;
metaIn = null;
test.doTest(blockReaderLocal, original, i * blockSize);
// BlockReaderLocal should not alter the file position.
Assert.assertEquals(0, streams[0].getChannel().position());
Assert.assertEquals(0, streams[1].getChannel().position());
}
cluster.shutdown(); cluster.shutdown();
cluster = null; cluster = null;
test.setup(dataFile, checksum);
FileInputStream streams[] = {
new FileInputStream(dataFile),
new FileInputStream(metaFile)
};
dataIn = streams[0];
metaIn = streams[1];
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
block.getBlockPoolId());
raf = new RandomAccessFile(
new File(sockDir.getDir().getAbsolutePath(),
UUID.randomUUID().toString()), "rw");
raf.setLength(8192);
FileInputStream shmStream = new FileInputStream(raf.getFD());
shm = new ShortCircuitShm(ShmId.createRandom(), shmStream);
ShortCircuitReplica replica =
new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache,
Time.now(), shm.allocAndRegisterSlot(
ExtendedBlockId.fromExtendedBlock(block)));
blockReaderLocal = new BlockReaderLocal.Builder(
new DfsClientConf.ShortCircuitConf(conf)).
setFilename(TEST_PATH.getName()).
setBlock(block).
setShortCircuitReplica(replica).
setCachingStrategy(new CachingStrategy(false, readahead)).
setVerifyChecksum(checksum).
build();
dataIn = null;
metaIn = null;
test.doTest(blockReaderLocal, original);
// BlockReaderLocal should not alter the file position.
Assert.assertEquals(0, streams[0].getChannel().position());
Assert.assertEquals(0, streams[1].getChannel().position());
} finally { } finally {
if (fsIn != null) fsIn.close(); if (fsIn != null) fsIn.close();
if (fs != null) fs.close(); if (fs != null) fs.close();
@ -227,6 +239,12 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
} }
} }
public void runBlockReaderLocalTest(BlockReaderLocalTest test,
boolean checksum, long readahead) throws IOException {
runBlockReaderLocalTest(BlockReaderLocalTest test,
boolean checksum, long readahead, 1);
}
private static class TestBlockReaderLocalImmediateClose private static class TestBlockReaderLocalImmediateClose
extends BlockReaderLocalTest { extends BlockReaderLocalTest {
} }
@ -242,7 +260,7 @@ private static class TestBlockReaderSimpleReads
@Override @Override
public void doTest(BlockReaderLocal reader, byte original[]) public void doTest(BlockReaderLocal reader, byte original[])
throws IOException { throws IOException {
byte buf[] = new byte[TEST_LENGTH]; byte[] buf = new byte[TEST_LENGTH];
reader.readFully(buf, 0, 512); reader.readFully(buf, 0, 512);
assertArrayRegionsEqual(original, 0, buf, 0, 512); assertArrayRegionsEqual(original, 0, buf, 0, 512);
reader.readFully(buf, 512, 512); reader.readFully(buf, 512, 512);
@ -291,7 +309,7 @@ private static class TestBlockReaderLocalArrayReads2
@Override @Override
public void doTest(BlockReaderLocal reader, byte original[]) public void doTest(BlockReaderLocal reader, byte original[])
throws IOException { throws IOException {
byte buf[] = new byte[TEST_LENGTH]; byte[] buf = new byte[TEST_LENGTH];
reader.readFully(buf, 0, 10); reader.readFully(buf, 0, 10);
assertArrayRegionsEqual(original, 0, buf, 0, 10); assertArrayRegionsEqual(original, 0, buf, 0, 10);
reader.readFully(buf, 10, 100); reader.readFully(buf, 10, 100);
@ -369,7 +387,7 @@ public void testBlockReaderLocalByteBufferReadsNoChecksum()
public void testBlockReaderLocalByteBufferReadsNoReadahead() public void testBlockReaderLocalByteBufferReadsNoReadahead()
throws IOException { throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
true, 0); true, 0);
} }
@Test @Test
@ -468,7 +486,7 @@ public void setup(File blockFile, boolean usingChecksums)
public void doTest(BlockReaderLocal reader, byte original[]) public void doTest(BlockReaderLocal reader, byte original[])
throws IOException { throws IOException {
byte buf[] = new byte[TEST_LENGTH]; byte[] buf = new byte[TEST_LENGTH];
if (usingChecksums) { if (usingChecksums) {
try { try {
reader.readFully(buf, 0, 10); reader.readFully(buf, 0, 10);
@ -508,7 +526,7 @@ public void setup(File blockFile, boolean usingChecksums)
public void doTest(BlockReaderLocal reader, byte original[]) public void doTest(BlockReaderLocal reader, byte original[])
throws IOException { throws IOException {
byte buf[] = new byte[TEST_LENGTH]; byte[] buf = new byte[TEST_LENGTH];
try { try {
reader.readFully(buf, 0, 10); reader.readFully(buf, 0, 10);
assertArrayRegionsEqual(original, 0, buf, 0, 10); assertArrayRegionsEqual(original, 0, buf, 0, 10);
@ -845,4 +863,78 @@ public void testStatisticsForErasureCodingRead() throws IOException {
} }
} }
} }
private static class TestBlockReaderFiveShortCircutCachesReads
extends BlockReaderLocalTest {
@Override
public void doTest(BlockReaderLocal reader, byte[] original, int shift)
throws IOException {
byte[] buf = new byte[TEST_LENGTH];
reader.readFully(buf, 0, 512);
assertArrayRegionsEqual(original, shift, buf, 0, 512);
reader.readFully(buf, 512, 512);
assertArrayRegionsEqual(original, 512 + shift, buf, 512, 512);
reader.readFully(buf, 1024, 513);
assertArrayRegionsEqual(original, 1024 + shift, buf, 1024, 513);
reader.readFully(buf, 1537, 514);
assertArrayRegionsEqual(original, 1537 + shift, buf, 1537, 514);
// Readahead is always at least the size of one chunk in this test.
Assert.assertTrue(reader.getMaxReadaheadLength() >=
BlockReaderLocalTest.BYTES_PER_CHECKSUM);
}
}
@Test
public void testBlockReaderFiveShortCircutCachesReads() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
5);
}
@Test
public void testBlockReaderFiveShortCircutCachesReadsShortReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
true, BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1,
5);
}
@Test
public void testBlockReaderFiveShortCircutCachesReadsNoChecksum()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
5);
}
@Test
public void testBlockReaderFiveShortCircutCachesReadsNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
true, 0, 5);
}
@Test
public void testBlockReaderFiveShortCircutCachesReadsNoChecksumNoReadahead()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
false, 0, 5);
}
@Test(expected = IllegalArgumentException.class)
public void testBlockReaderShortCircutCachesOutOfRangeBelow()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
0);
}
@Test(expected = IllegalArgumentException.class)
public void testBlockReaderShortCircutCachesOutOfRangeAbove()
throws IOException {
runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(),
true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT,
555);
}
} }

View File

@ -431,7 +431,7 @@ public void testAllocShm() throws Exception {
cluster.waitActive(); cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem(); DistributedFileSystem fs = cluster.getFileSystem();
final ShortCircuitCache cache = final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache(); fs.getClient().getClientContext().getShortCircuitCache(0);
cache.getDfsClientShmManager().visit(new Visitor() { cache.getDfsClientShmManager().visit(new Visitor() {
@Override @Override
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info) public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
@ -501,7 +501,7 @@ public void testShmBasedStaleness() throws Exception {
cluster.waitActive(); cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem(); DistributedFileSystem fs = cluster.getFileSystem();
final ShortCircuitCache cache = final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache(); fs.getClient().getClientContext().getShortCircuitCache(0);
String TEST_FILE = "/test_file"; String TEST_FILE = "/test_file";
final int TEST_FILE_LEN = 8193; final int TEST_FILE_LEN = 8193;
final int SEED = 0xFADED; final int SEED = 0xFADED;
@ -565,7 +565,7 @@ public void testUnlinkingReplicasInFileDescriptorCache() throws Exception {
cluster.waitActive(); cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem(); DistributedFileSystem fs = cluster.getFileSystem();
final ShortCircuitCache cache = final ShortCircuitCache cache =
fs.getClient().getClientContext().getShortCircuitCache(); fs.getClient().getClientContext().getShortCircuitCache(0);
cache.getDfsClientShmManager().visit(new Visitor() { cache.getDfsClientShmManager().visit(new Visitor() {
@Override @Override
public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info) public void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> info)
@ -877,19 +877,20 @@ public void testRequestFileDescriptorsWhenULimit() throws Exception {
return peerCache; return peerCache;
}); });
Mockito.when(clientContext.getShortCircuitCache()).thenAnswer( Mockito.when(clientContext.getShortCircuitCache(
blk.getBlock().getBlockId())).thenAnswer(
(Answer<ShortCircuitCache>) shortCircuitCacheCall -> { (Answer<ShortCircuitCache>) shortCircuitCacheCall -> {
ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class); ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class);
Mockito.when(cache.allocShmSlot( Mockito.when(cache.allocShmSlot(
Mockito.any(DatanodeInfo.class), Mockito.any(DatanodeInfo.class),
Mockito.any(DomainPeer.class), Mockito.any(DomainPeer.class),
Mockito.any(MutableBoolean.class), Mockito.any(MutableBoolean.class),
Mockito.any(ExtendedBlockId.class), Mockito.any(ExtendedBlockId.class),
Mockito.anyString())) Mockito.anyString()))
.thenAnswer((Answer<Slot>) call -> null); .thenAnswer((Answer<Slot>) call -> null);
return cache; return cache;
} }
); );
DatanodeInfo[] nodes = blk.getLocations(); DatanodeInfo[] nodes = blk.getLocations();