HDFS-6107. When a block cannot be cached due to limited space on the DataNode, it becomes uncacheable (cmccabe)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1578508 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bc00bc966b
commit
d265dd9eb0
@ -626,6 +626,9 @@ Release 2.4.0 - UNRELEASED
|
||||
HDFS-6094. The same block can be counted twice towards safe mode
|
||||
threshold. (Arpit Agarwal)
|
||||
|
||||
HDFS-6107. When a block can't be cached due to limited space on the
|
||||
DataNode, that block becomes uncacheable (cmccabe)
|
||||
|
||||
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)
|
||||
|
@ -597,14 +597,12 @@ private boolean processCommandFromActive(DatanodeCommand cmd,
|
||||
blockIdCmd.getBlockPoolId() + " of [" +
|
||||
blockIdArrayToString(blockIdCmd.getBlockIds()) + "]");
|
||||
dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
|
||||
dn.metrics.incrBlocksCached(blockIdCmd.getBlockIds().length);
|
||||
break;
|
||||
case DatanodeProtocol.DNA_UNCACHE:
|
||||
LOG.info("DatanodeCommand action: DNA_UNCACHE for " +
|
||||
blockIdCmd.getBlockPoolId() + " of [" +
|
||||
blockIdArrayToString(blockIdCmd.getBlockIds()) + "]");
|
||||
dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
|
||||
dn.metrics.incrBlocksUncached(blockIdCmd.getBlockIds().length);
|
||||
break;
|
||||
case DatanodeProtocol.DNA_SHUTDOWN:
|
||||
// TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command
|
||||
|
@ -1040,7 +1040,7 @@ public InterDatanodeProtocol run() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
DataNodeMetrics getMetrics() {
|
||||
public DataNodeMetrics getMetrics() {
|
||||
return metrics;
|
||||
}
|
||||
|
||||
|
@ -337,15 +337,16 @@ public void run() {
|
||||
ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
|
||||
key.getBlockId(), length, genstamp);
|
||||
long newUsedBytes = usedBytesCount.reserve(length);
|
||||
if (newUsedBytes < 0) {
|
||||
LOG.warn("Failed to cache " + key + ": could not reserve " + length +
|
||||
" more bytes in the cache: " +
|
||||
DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
|
||||
" of " + maxBytes + " exceeded.");
|
||||
numBlocksFailedToCache.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
boolean reservedBytes = false;
|
||||
try {
|
||||
if (newUsedBytes < 0) {
|
||||
LOG.warn("Failed to cache " + key + ": could not reserve " + length +
|
||||
" more bytes in the cache: " +
|
||||
DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
|
||||
" of " + maxBytes + " exceeded.");
|
||||
return;
|
||||
}
|
||||
reservedBytes = true;
|
||||
try {
|
||||
blockIn = (FileInputStream)dataset.getBlockInputStream(extBlk, 0);
|
||||
metaIn = (FileInputStream)dataset.getMetaDataInputStream(extBlk)
|
||||
@ -391,10 +392,13 @@ public void run() {
|
||||
}
|
||||
dataset.datanode.getShortCircuitRegistry().processBlockMlockEvent(key);
|
||||
numBlocksCached.addAndGet(1);
|
||||
dataset.datanode.getMetrics().incrBlocksCached(1);
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
newUsedBytes = usedBytesCount.release(length);
|
||||
if (reservedBytes) {
|
||||
newUsedBytes = usedBytesCount.release(length);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Caching of " + key + " was aborted. We are now " +
|
||||
"caching only " + newUsedBytes + " + bytes in total.");
|
||||
@ -439,6 +443,7 @@ public void run() {
|
||||
long newUsedBytes =
|
||||
usedBytesCount.release(value.mappableBlock.getLength());
|
||||
numBlocksCached.addAndGet(-1);
|
||||
dataset.datanode.getMetrics().incrBlocksUncached(1);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Uncaching of " + key + " completed. " +
|
||||
"usedBytes = " + newUsedBytes);
|
||||
|
@ -1123,6 +1123,11 @@ public Boolean get() {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
LOG.info("verifyExpectedCacheUsage: got " +
|
||||
curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " +
|
||||
curBlocks + "/" + expectedBlocks + " blocks cached. " +
|
||||
"memlock limit = " +
|
||||
NativeIO.POSIX.getCacheManipulator().getMemlockLimit());
|
||||
return true;
|
||||
}
|
||||
}, 100, 60000);
|
||||
|
@ -40,12 +40,15 @@
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.HdfsBlockLocation;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.hdfs.BlockReaderTestUtil;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.LogVerificationAppender;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
|
||||
@ -80,6 +83,7 @@
|
||||
import org.apache.log4j.LogManager;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.primitives.Ints;
|
||||
|
||||
public class TestFsDatasetCache {
|
||||
private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class);
|
||||
@ -349,10 +353,13 @@ public Boolean get() {
|
||||
fsd.getNumBlocksFailedToCache() > 0);
|
||||
|
||||
// Uncache the n-1 files
|
||||
int curCachedBlocks = 16;
|
||||
for (int i=0; i<numFiles-1; i++) {
|
||||
setHeartbeatResponse(uncacheBlocks(fileLocs[i]));
|
||||
total -= rounder.round(fileSizes[i]);
|
||||
DFSTestUtil.verifyExpectedCacheUsage(total, 4 * (numFiles - 2 - i), fsd);
|
||||
long uncachedBytes = rounder.round(fileSizes[i]);
|
||||
total -= uncachedBytes;
|
||||
curCachedBlocks -= uncachedBytes / BLOCK_SIZE;
|
||||
DFSTestUtil.verifyExpectedCacheUsage(total, curCachedBlocks, fsd);
|
||||
}
|
||||
LOG.info("finishing testFilesExceedMaxLockedMemory");
|
||||
}
|
||||
@ -491,4 +498,78 @@ public Boolean get() {
|
||||
MetricsAsserts.assertCounter("BlocksCached", 1l, dnMetrics);
|
||||
MetricsAsserts.assertCounter("BlocksUncached", 1l, dnMetrics);
|
||||
}
|
||||
|
||||
@Test(timeout=60000)
|
||||
public void testReCacheAfterUncache() throws Exception {
|
||||
final int TOTAL_BLOCKS_PER_CACHE =
|
||||
Ints.checkedCast(CACHE_CAPACITY / BLOCK_SIZE);
|
||||
BlockReaderTestUtil.enableHdfsCachingTracing();
|
||||
Assert.assertEquals(0, CACHE_CAPACITY % BLOCK_SIZE);
|
||||
|
||||
// Create a small file
|
||||
final Path SMALL_FILE = new Path("/smallFile");
|
||||
DFSTestUtil.createFile(fs, SMALL_FILE,
|
||||
BLOCK_SIZE, (short)1, 0xcafe);
|
||||
|
||||
// Create a file that will take up the whole cache
|
||||
final Path BIG_FILE = new Path("/bigFile");
|
||||
DFSTestUtil.createFile(fs, BIG_FILE,
|
||||
TOTAL_BLOCKS_PER_CACHE * BLOCK_SIZE, (short)1, 0xbeef);
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
dfs.addCachePool(new CachePoolInfo("pool"));
|
||||
final long bigCacheDirectiveId =
|
||||
dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
|
||||
.setPool("pool").setPath(BIG_FILE).setReplication((short)1).build());
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
|
||||
long blocksCached =
|
||||
MetricsAsserts.getLongCounter("BlocksCached", dnMetrics);
|
||||
if (blocksCached != TOTAL_BLOCKS_PER_CACHE) {
|
||||
LOG.info("waiting for " + TOTAL_BLOCKS_PER_CACHE + " to " +
|
||||
"be cached. Right now only " + blocksCached + " blocks are cached.");
|
||||
return false;
|
||||
}
|
||||
LOG.info(TOTAL_BLOCKS_PER_CACHE + " blocks are now cached.");
|
||||
return true;
|
||||
}
|
||||
}, 1000, 30000);
|
||||
|
||||
// Try to cache a smaller file. It should fail.
|
||||
final long shortCacheDirectiveId =
|
||||
dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
|
||||
.setPool("pool").setPath(SMALL_FILE).setReplication((short)1).build());
|
||||
Thread.sleep(10000);
|
||||
MetricsRecordBuilder dnMetrics = getMetrics(dn.getMetrics().name());
|
||||
Assert.assertEquals(TOTAL_BLOCKS_PER_CACHE,
|
||||
MetricsAsserts.getLongCounter("BlocksCached", dnMetrics));
|
||||
|
||||
// Uncache the big file and verify that the small file can now be
|
||||
// cached (regression test for HDFS-XXXX)
|
||||
dfs.removeCacheDirective(bigCacheDirectiveId);
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
RemoteIterator<CacheDirectiveEntry> iter;
|
||||
try {
|
||||
iter = dfs.listCacheDirectives(
|
||||
new CacheDirectiveInfo.Builder().build());
|
||||
CacheDirectiveEntry entry;
|
||||
do {
|
||||
entry = iter.next();
|
||||
} while (entry.getInfo().getId() != shortCacheDirectiveId);
|
||||
if (entry.getStats().getFilesCached() != 1) {
|
||||
LOG.info("waiting for directive " + shortCacheDirectiveId +
|
||||
" to be cached. stats = " + entry.getStats());
|
||||
return false;
|
||||
}
|
||||
LOG.info("directive " + shortCacheDirectiveId + " has been cached.");
|
||||
} catch (IOException e) {
|
||||
Assert.fail("unexpected exception" + e.toString());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}, 1000, 30000);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user