HDFS-11667. Block Storage:Handling flushing of incomplete block id buffers during shutdown. Contributed by Mukul Kumar Singh.

This commit is contained in:
Chen Liang 2017-04-20 13:50:29 -07:00 committed by Owen O'Malley
parent 1a245accb5
commit 668b056984
6 changed files with 193 additions and 34 deletions

View File

@ -95,7 +95,7 @@ public void run() {
flusher.getLOG().debug("Time taken for Write Small File : {} ms",
endTime - startTime);
flusher.incrementremoteIO();
flusher.incrementRemoteIO();
} catch (IOException ex) {
flusher.getLOG().error("Writing of block failed, We have attempted " +
@ -119,7 +119,7 @@ private void writeRetryBlock(LogicalBlock currentBlock) {
File logDir = new File(this.dbPath);
if (!logDir.exists() && !logDir.mkdirs()) {
flusher.getLOG().error(
"Unable to create the log directory, Crticial error cannot continue");
"Unable to create the log directory, Critical error cannot continue");
return;
}
String log = Paths.get(this.dbPath, retryFileName).toString();

View File

@ -43,6 +43,11 @@ public class CBlockTargetMetrics {
@Metric private MutableCounterLong numBlockBufferFlush;
@Metric private MutableCounterLong numDirectBlockWrites;
@Metric private MutableCounterLong numFailedDirectBlockWrites;
@Metric private MutableCounterLong numDirtyLogBlockRead;
@Metric private MutableCounterLong numBytesDirtyLogRead;
@Metric private MutableCounterLong numDirtyLogBlockUpdated;
@Metric private MutableCounterLong numBytesDirtyLogWritten;
@Metric private MutableCounterLong numFailedDirtyBlockFlushes;
// Latency based Metrics
@Metric private MutableRate dbReadLatency;
@ -94,6 +99,26 @@ public void incNumBlockBufferFlush() {
numBlockBufferFlush.incr();
}
public void incNumDirtyLogBlockRead() {
numDirtyLogBlockRead.incr();
}
public void incNumBytesDirtyLogRead(int bytes) {
numBytesDirtyLogRead.incr(bytes);
}
public void incNumDirtyLogBlockUpdated() {
numDirtyLogBlockUpdated.incr();
}
public void incNumBytesDirtyLogWritten(int bytes) {
numBytesDirtyLogWritten.incr(bytes);
}
public void incNumFailedDirtyBlockFlushes() {
numFailedDirtyBlockFlushes.incr();
}
public void updateDBReadLatency(long latency) {
dbReadLatency.add(latency);
}
@ -157,4 +182,29 @@ public long getNumFailedDirectBlockWrites() {
public long getNumBlockBufferFlush() {
return numBlockBufferFlush.value();
}
@VisibleForTesting
public long getNumDirtyLogBlockRead() {
return numDirtyLogBlockRead.value();
}
@VisibleForTesting
public long getNumBytesDirtyLogReads() {
return numBytesDirtyLogRead.value();
}
@VisibleForTesting
public long getNumDirtyLogBlockUpdated() {
return numDirtyLogBlockUpdated.value();
}
@VisibleForTesting
public long getNumBytesDirtyLogWritten() {
return numBytesDirtyLogWritten.value();
}
@VisibleForTesting
public long getNumFailedDirtyBlockFlushes() {
return numFailedDirtyBlockFlushes.value();
}
}

View File

@ -213,7 +213,7 @@ public void shutdown() {
threadPoolExecutor.shutdown();
}
public long incrementremoteIO() {
public long incrementRemoteIO() {
return remoteIO.incrementAndGet();
}
@ -352,6 +352,7 @@ public void run() {
// bytes per long (which is calculated by number of bits per long
// divided by number of bits per byte) gives the number of blocks
int blockCount = blockIDBuffer.position()/(Long.SIZE / Byte.SIZE);
getTargetMetrics().incNumBytesDirtyLogRead(bytesRead);
if (finishCountMap.containsKey(message.getFileName())) {
// In theory this should never happen. But if it happened,
// we need to know it...
@ -369,6 +370,7 @@ public void run() {
LOG.debug("Remaining blocks count {} and {}", blockIDBuffer.remaining(),
blockCount);
while (blockIDBuffer.remaining() >= (Long.SIZE / Byte.SIZE)) {
getTargetMetrics().incNumDirtyLogBlockRead();
long blockID = blockIDBuffer.getLong();
LogicalBlock block = new DiskBlock(blockID, null, false);
BlockWriterTask blockWriterTask = new BlockWriterTask(block, this,

View File

@ -105,6 +105,16 @@ public AsyncBlockWriter(Configuration config, CBlockLocalCache cache) {
blockIDBuffer = ByteBuffer.allocateDirect(blockBufferSize);
}
public void start() throws IOException {
File logDir = new File(parentCache.getDbPath().toString());
if (!logDir.exists() && !logDir.mkdirs()) {
LOG.error("Unable to create the log directory, Critical error cannot " +
"continue. Log Dir : {}", logDir);
throw new IllegalStateException("Cache Directory create failed, Cannot " +
"continue. Log Dir: {}" + logDir);
}
}
/**
* Return the log to write to.
*
@ -169,6 +179,11 @@ public void writeBlock(LogicalBlock block) throws IOException {
block.getBlockID(), endTime - startTime, datahash);
}
block.clearData();
if (blockIDBuffer.remaining() <= (Long.SIZE / Byte.SIZE)) {
writeBlockBufferToFile(blockIDBuffer);
}
parentCache.getTargetMetrics().incNumDirtyLogBlockUpdated();
blockIDBuffer.putLong(block.getBlockID());
} else {
Pipeline pipeline = parentCache.getPipeline(block.getBlockID());
String containerName = pipeline.getContainerName();
@ -198,49 +213,59 @@ public void writeBlock(LogicalBlock block) throws IOException {
block.clearData();
}
}
if (blockIDBuffer.remaining() <= (Long.SIZE / Byte.SIZE)) {
long startTime = Time.monotonicNow();
blockIDBuffer.flip();
writeBlockBufferToFile(blockIDBuffer);
blockIDBuffer.clear();
long endTime = Time.monotonicNow();
if (parentCache.isTraceEnabled()) {
parentCache.getTracer().info(
"Task=DirtyBlockLogWrite,Time={}", endTime - startTime);
}
parentCache.getTargetMetrics().incNumBlockBufferFlush();
parentCache.getTargetMetrics()
.updateBlockBufferFlushLatency(endTime - startTime);
}
blockIDBuffer.putLong(block.getBlockID());
}
/**
* Write Block Buffer to file.
*
* @param blockID - ByteBuffer
* @param blockBuffer - ByteBuffer
* @throws IOException
*/
private void writeBlockBufferToFile(ByteBuffer blockID)
private synchronized void writeBlockBufferToFile(ByteBuffer blockBuffer)
throws IOException {
long startTime = Time.monotonicNow();
boolean append = false;
int bytesWritten = 0;
// If there is nothing written to blockId buffer,
// then skip flushing of blockId buffer
if (blockBuffer.position() == 0) {
return;
}
blockBuffer.flip();
String fileName =
String.format("%s.%s", DIRTY_LOG_PREFIX, Time.monotonicNow());
File logDir = new File(parentCache.getDbPath().toString());
if (!logDir.exists() && !logDir.mkdirs()) {
LOG.error("Unable to create the log directory, Critical error cannot " +
"continue. Log Dir : {}", logDir);
throw new IllegalStateException("Cache Directory create failed, Cannot " +
"continue. Log Dir: {}" + logDir);
}
String log = Paths.get(parentCache.getDbPath().toString(), fileName)
.toString();
try (FileChannel channel = new FileOutputStream(log, append).getChannel()) {
channel.write(blockID);
try {
FileChannel channel = new FileOutputStream(log, append).getChannel();
bytesWritten = channel.write(blockBuffer);
} catch (Exception ex) {
LOG.error("Unable to sync the Block map to disk -- This might cause a " +
"data loss or corruption", ex);
parentCache.getTargetMetrics().incNumFailedDirtyBlockFlushes();
throw ex;
} finally {
blockBuffer.clear();
}
blockID.clear();
parentCache.processDirtyMessage(fileName);
blockIDBuffer.clear();
long endTime = Time.monotonicNow();
if (parentCache.isTraceEnabled()) {
parentCache.getTracer().info(
"Task=DirtyBlockLogWrite,Time={} bytesWritten={}",
endTime - startTime, bytesWritten);
}
parentCache.getTargetMetrics().incNumBytesDirtyLogWritten(bytesWritten);
parentCache.getTargetMetrics().incNumBlockBufferFlush();
parentCache.getTargetMetrics()
.updateBlockBufferFlushLatency(endTime - startTime);
LOG.debug("Block buffer writer bytesWritten:{} Time:{}",
bytesWritten, endTime - startTime);
}
/**

View File

@ -199,7 +199,7 @@ public void processDirtyMessage(String fileName) {
* @param dbPathString - Path to db
* @return long bytes remaining.
*/
private static long getRemaningDiskSpace(String dbPathString) {
private static long getRemainingDiskSpace(String dbPathString) {
try {
URI fileUri = new URI("file:///");
Path dbPath = Paths.get(fileUri).resolve(dbPathString);
@ -298,7 +298,7 @@ public void flush() throws IOException {
@Override
public void start() throws IOException {
// This is a No-op for us. We start when we bootup.
blockWriter.start();
}
@Override
@ -457,7 +457,7 @@ private static long computeCacheSize(Configuration configuration,
if (StringUtils.isBlank(dbPath)) {
return cacheSize;
}
long spaceRemaining = getRemaningDiskSpace(dbPath);
long spaceRemaining = getRemainingDiskSpace(dbPath);
double cacheRatio = 1.0;
if (spaceRemaining < volumeSize) {

View File

@ -157,6 +157,7 @@ public void testCacheWriteRead() throws IOException,
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
Assert.assertEquals(1, metrics.getNumWriteOps());
// Please note that this read is from the local cache.
@ -202,6 +203,7 @@ public void testCacheWriteToRemoteContainer() throws IOException,
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
cache.close();
@ -228,6 +230,7 @@ public void testCacheWriteToRemote50KBlocks() throws IOException,
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
long startTime = Time.monotonicNow();
for (long blockid = 0; blockid < totalBlocks; blockid++) {
cache.put(blockid, data.getBytes(StandardCharsets.UTF_8));
@ -265,6 +268,7 @@ public void testCacheInvalidBlock() throws IOException {
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
// Read a non-existent block ID.
LogicalBlock block = cache.get(blockID);
Assert.assertNotNull(block);
@ -298,6 +302,7 @@ public void testReadWriteCorrectness() throws IOException,
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
for (int x = 0; x < blockCount; x++) {
String data = RandomStringUtils.random(4 * 1024);
String dataHash = DigestUtils.sha256Hex(data);
@ -342,7 +347,7 @@ public void testReadWriteCorrectness() throws IOException,
.setFlusher(newflusher)
.setCBlockTargetMetrics(newMetrics)
.build();
newCache.start();
for (Map.Entry<Long, String> entry : blockShaMap.entrySet()) {
LogicalBlock block = newCache.get(entry.getKey());
String blockSha = DigestUtils.sha256Hex(block.getData().array());
@ -471,6 +476,7 @@ public void testDirectIO() throws IOException,
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
Assert.assertFalse(cache.isShortCircuitIOEnabled());
cache.put(blockID, data.getBytes(StandardCharsets.UTF_8));
Assert.assertEquals(1, metrics.getNumDirectBlockWrites());
@ -498,4 +504,80 @@ public void testDirectIO() throws IOException,
GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
cache.close();
}
/**
* This test writes some block to the cache and then shuts down the cache.
* The cache is then restarted to check that the
* correct number of blocks are read from Dirty Log
*
* @throws IOException
*/
@Test
public void testEmptyBlockBufferHandling() throws IOException,
InterruptedException, TimeoutException {
// Create a new config so that this tests write metafile to new location
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
URL p = flushTestConfig.getClass().getResource("");
String path = p.getPath().concat(
TestOzoneContainer.class.getSimpleName()
+ "/testEmptyBlockBufferHandling");
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
String userName = "user" + RandomStringUtils.randomNumeric(4);
String data = RandomStringUtils.random(4 * KB);
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, metrics);
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
.setConfiguration(flushTestConfig)
.setVolumeName(volumeName)
.setUserName(userName)
.setPipelines(getContainerPipeline(10))
.setClientManager(xceiverClientManager)
.setBlockSize(4 * KB)
.setVolumeSize(50 * GB)
.setFlusher(flusher)
.setCBlockTargetMetrics(metrics)
.build();
cache.start();
// Write data to the cache
cache.put(1, data.getBytes(StandardCharsets.UTF_8));
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
Assert.assertEquals(1, metrics.getNumWriteOps());
cache.put(2, data.getBytes(StandardCharsets.UTF_8));
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
Assert.assertEquals(2, metrics.getNumWriteOps());
// Store the previous block buffer position
Assert.assertEquals(2, metrics.getNumDirtyLogBlockUpdated());
// Simulate a shutdown by closing the cache
GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
cache.close();
Assert.assertEquals(2 * (Long.SIZE/ Byte.SIZE),
metrics.getNumBytesDirtyLogWritten());
Assert.assertEquals(0, metrics.getNumFailedDirtyBlockFlushes());
// Restart cache and check that right number of entries are read
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
ContainerCacheFlusher newFlusher =
new ContainerCacheFlusher(flushTestConfig,
xceiverClientManager, newMetrics);
Thread fllushListenerThread = new Thread(newFlusher);
fllushListenerThread.setDaemon(true);
fllushListenerThread.start();
Thread.sleep(5000);
Assert.assertEquals(metrics.getNumDirtyLogBlockUpdated(),
newMetrics.getNumDirtyLogBlockRead());
Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead()
* (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads());
// Now shutdown again, nothing should be flushed
newFlusher.shutdown();
Assert.assertEquals(0, newMetrics.getNumDirtyLogBlockUpdated());
Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten());
Assert.assertEquals(0, newMetrics.getNumFailedDirtyBlockFlushes());
}
}