diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java index 74f5dc69f2..c7222ca534 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java @@ -132,6 +132,11 @@ public final class CBlockConfigKeys { "dfs.cblock.cache.block.buffer.size"; public static final int DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT = 512; + public static final String DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS = + "dfs.cblock.block.buffer.flush.interval.seconds"; + public static final int + DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS_DEFAULT = 60; + // jscsi server settings public static final String DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY = "dfs.cblock.jscsi.server.address"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java index 1174c332df..5efb461265 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java @@ -39,14 +39,15 @@ public class CBlockTargetMetrics { @Metric private MutableCounterLong numWriteOps; @Metric private MutableCounterLong numReadCacheHits; @Metric private MutableCounterLong numReadCacheMiss; + @Metric private MutableCounterLong numDirectBlockWrites; // Cblock internal Metrics - @Metric private MutableCounterLong numDirectBlockWrites; - @Metric private MutableCounterLong numBlockBufferFlush; @Metric private MutableCounterLong numDirtyLogBlockRead; - @Metric private MutableCounterLong numDirtyLogBlockUpdated; @Metric private MutableCounterLong numBytesDirtyLogRead; @Metric private MutableCounterLong numBytesDirtyLogWritten; + @Metric private MutableCounterLong numBlockBufferFlushCompleted; + @Metric private MutableCounterLong numBlockBufferFlushTriggered; + @Metric private MutableCounterLong numBlockBufferUpdates; // Failure Metrics @Metric private MutableCounterLong numReadLostBlocks; @@ -54,7 +55,10 @@ public class CBlockTargetMetrics { @Metric private MutableCounterLong numWriteIOExceptionRetryBlocks; @Metric private MutableCounterLong numWriteGenericExceptionRetryBlocks; @Metric private MutableCounterLong numFailedDirectBlockWrites; - @Metric private MutableCounterLong numFailedDirtyBlockFlushes; + @Metric private MutableCounterLong numIllegalDirtyLogFiles; + @Metric private MutableCounterLong numFailedDirtyLogFileDeletes; + @Metric private MutableCounterLong numFailedBlockBufferFlushes; + @Metric private MutableCounterLong numInterruptedBufferWaits; // Latency based Metrics @Metric private MutableRate dbReadLatency; @@ -114,8 +118,12 @@ public void incNumFailedReadBlocks() { numFailedReadBlocks.incr(); } - public void incNumBlockBufferFlush() { - numBlockBufferFlush.incr(); + public void incNumBlockBufferFlushCompleted() { + numBlockBufferFlushCompleted.incr(); + } + + public void incNumBlockBufferFlushTriggered() { + numBlockBufferFlushTriggered.incr(); } public void incNumDirtyLogBlockRead() { @@ -126,16 +134,28 @@ public void incNumBytesDirtyLogRead(int bytes) { numBytesDirtyLogRead.incr(bytes); } - public void incNumDirtyLogBlockUpdated() { - numDirtyLogBlockUpdated.incr(); + public void incNumBlockBufferUpdates() { + numBlockBufferUpdates.incr(); } public void incNumBytesDirtyLogWritten(int bytes) { numBytesDirtyLogWritten.incr(bytes); } - public void incNumFailedDirtyBlockFlushes() { - numFailedDirtyBlockFlushes.incr(); + public void incNumFailedBlockBufferFlushes() { + numFailedBlockBufferFlushes.incr(); + } + + public void incNumInterruptedBufferWaits() { + numInterruptedBufferWaits.incr(); + } + + public void incNumIllegalDirtyLogFiles() { + numIllegalDirtyLogFiles.incr(); + } + + public void incNumFailedDirtyLogFileDeletes() { + numFailedDirtyLogFileDeletes.incr(); } public void updateDBReadLatency(long latency) { @@ -213,8 +233,13 @@ public long getNumWriteGenericExceptionRetryBlocks() { } @VisibleForTesting - public long getNumBlockBufferFlush() { - return numBlockBufferFlush.value(); + public long getNumBlockBufferFlushCompleted() { + return numBlockBufferFlushCompleted.value(); + } + + @VisibleForTesting + public long getNumBlockBufferFlushTriggered() { + return numBlockBufferFlushTriggered.value(); } @VisibleForTesting @@ -228,8 +253,8 @@ public long getNumBytesDirtyLogReads() { } @VisibleForTesting - public long getNumDirtyLogBlockUpdated() { - return numDirtyLogBlockUpdated.value(); + public long getNumBlockBufferUpdates() { + return numBlockBufferUpdates.value(); } @VisibleForTesting @@ -238,7 +263,22 @@ public long getNumBytesDirtyLogWritten() { } @VisibleForTesting - public long getNumFailedDirtyBlockFlushes() { - return numFailedDirtyBlockFlushes.value(); + public long getNumFailedBlockBufferFlushes() { + return numFailedBlockBufferFlushes.value(); + } + + @VisibleForTesting + public long getNumInterruptedBufferWaits() { + return numInterruptedBufferWaits.value(); + } + + @VisibleForTesting + public long getNumIllegalDirtyLogFiles() { + return numIllegalDirtyLogFiles.value(); + } + + @VisibleForTesting + public long getNumFailedDirtyLogFileDeletes() { + return numFailedDirtyLogFileDeletes.value(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java index 905d9bade3..c796f734f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java @@ -363,6 +363,7 @@ public void run() { if (finishCountMap.containsKey(message.getFileName())) { // In theory this should never happen. But if it happened, // we need to know it... + getTargetMetrics().incNumIllegalDirtyLogFiles(); LOG.error("Adding DirtyLog file again {} current count {} new {}", message.getFileName(), finishCountMap.get(message.getFileName()).expectedCount, @@ -516,6 +517,7 @@ public void incCount() { }*/ fileDeleted.set(true); } catch (Exception e) { + flusher.getTargetMetrics().incNumFailedDirtyLogFileDeletes(); LOG.error("Error deleting dirty log file:" + filePath, e); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java index 1273cd286c..d8e98391fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java @@ -32,19 +32,12 @@ import org.slf4j.LoggerFactory; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.Paths; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE; -import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT; - /** * A Queue that is used to write blocks asynchronously to the container. */ @@ -52,12 +45,6 @@ public class AsyncBlockWriter { private static final Logger LOG = LoggerFactory.getLogger(AsyncBlockWriter.class); - /** - * Right now we have a single buffer and we block when we write it to - * the file. - */ - private final ByteBuffer blockIDBuffer; - /** * XceiverClientManager is used to get client connections to a set of * machines. @@ -80,8 +67,8 @@ public class AsyncBlockWriter { * The cache this writer is operating against. */ private final CBlockLocalCache parentCache; - private final int blockBufferSize; - private final static String DIRTY_LOG_PREFIX = "DirtyLog"; + private final BlockBufferManager blockBufferManager; + public final static String DIRTY_LOG_PREFIX = "DirtyLog"; private AtomicLong localIoCount; /** @@ -95,14 +82,11 @@ public AsyncBlockWriter(Configuration config, CBlockLocalCache cache) { Preconditions.checkNotNull(cache, "Cache cannot be null."); Preconditions.checkNotNull(cache.getCacheDB(), "DB cannot be null."); localIoCount = new AtomicLong(); - blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, - DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE); - LOG.info("Cache: Block Size: {}", blockBufferSize); lock = new ReentrantLock(); notEmpty = lock.newCondition(); parentCache = cache; xceiverClientManager = cache.getClientManager(); - blockIDBuffer = ByteBuffer.allocateDirect(blockBufferSize); + blockBufferManager = new BlockBufferManager(config, parentCache); } public void start() throws IOException { @@ -113,6 +97,7 @@ public void start() throws IOException { throw new IllegalStateException("Cache Directory create failed, Cannot " + "continue. Log Dir: {}" + logDir); } + blockBufferManager.start(); } /** @@ -179,11 +164,7 @@ public void writeBlock(LogicalBlock block) throws IOException { block.getBlockID(), endTime - startTime, datahash); } block.clearData(); - parentCache.getTargetMetrics().incNumDirtyLogBlockUpdated(); - blockIDBuffer.putLong(block.getBlockID()); - if (blockIDBuffer.remaining() == 0) { - writeBlockBufferToFile(blockIDBuffer); - } + blockBufferManager.addToBlockBuffer(block.getBlockID()); } else { Pipeline pipeline = parentCache.getPipeline(block.getBlockID()); String containerName = pipeline.getContainerName(); @@ -215,69 +196,11 @@ public void writeBlock(LogicalBlock block) throws IOException { } } - /** - * Write Block Buffer to file. - * - * @param blockBuffer - ByteBuffer - * @throws IOException - */ - private synchronized void writeBlockBufferToFile(ByteBuffer blockBuffer) - throws IOException { - long startTime = Time.monotonicNow(); - boolean append = false; - int bytesWritten = 0; - - // If there is nothing written to blockId buffer, - // then skip flushing of blockId buffer - if (blockBuffer.position() == 0) { - return; - } - - blockBuffer.flip(); - String fileName = - String.format("%s.%s", DIRTY_LOG_PREFIX, Time.monotonicNow()); - String log = Paths.get(parentCache.getDbPath().toString(), fileName) - .toString(); - - try { - FileChannel channel = new FileOutputStream(log, append).getChannel(); - bytesWritten = channel.write(blockBuffer); - } catch (Exception ex) { - LOG.error("Unable to sync the Block map to disk -- This might cause a " + - "data loss or corruption", ex); - parentCache.getTargetMetrics().incNumFailedDirtyBlockFlushes(); - throw ex; - } finally { - blockBuffer.clear(); - } - - parentCache.processDirtyMessage(fileName); - blockIDBuffer.clear(); - long endTime = Time.monotonicNow(); - if (parentCache.isTraceEnabled()) { - parentCache.getTracer().info( - "Task=DirtyBlockLogWrite,Time={} bytesWritten={}", - endTime - startTime, bytesWritten); - } - - parentCache.getTargetMetrics().incNumBytesDirtyLogWritten(bytesWritten); - parentCache.getTargetMetrics().incNumBlockBufferFlush(); - parentCache.getTargetMetrics() - .updateBlockBufferFlushLatency(endTime - startTime); - LOG.debug("Block buffer writer bytesWritten:{} Time:{}", - bytesWritten, endTime - startTime); - } - /** * Shutdown by writing any pending I/O to dirtylog buffer. */ public void shutdown() { - try { - writeBlockBufferToFile(this.blockIDBuffer); - } catch (IOException e) { - LOG.error("Unable to sync the Block map to disk -- This might cause a " + - "data loss or corruption"); - } + blockBufferManager.shutdown(); } /** * Returns tracer. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java new file mode 100644 index 0000000000..c61a7a4c23 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferFlushTask.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.cblock.jscsiHelper.cache.impl; + +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Paths; + +/** + * This task is responsible for flushing the BlockIDBuffer + * to Dirty Log File. This Dirty Log file is used later by + * ContainerCacheFlusher when the data is written to container + */ +public class BlockBufferFlushTask implements Runnable { + private static final Logger LOG = + LoggerFactory.getLogger(BlockBufferFlushTask.class); + private final CBlockLocalCache parentCache; + private final BlockBufferManager bufferManager; + private final ByteBuffer blockIDBuffer; + + BlockBufferFlushTask(ByteBuffer blockIDBuffer, CBlockLocalCache parentCache, + BlockBufferManager manager) { + this.parentCache = parentCache; + this.bufferManager = manager; + this.blockIDBuffer = blockIDBuffer; + } + + /** + * When an object implementing interface Runnable is used + * to create a thread, starting the thread causes the object's + * run method to be called in that separately executing + * thread. + *

+ * The general contract of the method run is that it may + * take any action whatsoever. + * + * @see Thread#run() + */ + @Override + public void run() { + try { + writeBlockBufferToFile(blockIDBuffer); + } catch (Exception e) { + parentCache.getTargetMetrics().incNumFailedBlockBufferFlushes(); + LOG.error("Unable to sync the Block map to disk with " + + (blockIDBuffer.position() / Long.SIZE) + "entries " + + "-- NOTE: This might cause a data loss or corruption", e); + } finally { + bufferManager.releaseBuffer(blockIDBuffer); + } + } + + /** + * Write Block Buffer to file. + * + * @param buffer - ByteBuffer + * @throws IOException + */ + private void writeBlockBufferToFile(ByteBuffer buffer) + throws IOException { + long startTime = Time.monotonicNow(); + boolean append = false; + + // If there is nothing written to blockId buffer, + // then skip flushing of blockId buffer + if (buffer.position() == 0) { + return; + } + + buffer.flip(); + String fileName = + String.format("%s.%s", AsyncBlockWriter.DIRTY_LOG_PREFIX, + Time.monotonicNow()); + String log = Paths.get(parentCache.getDbPath().toString(), fileName) + .toString(); + + FileChannel channel = new FileOutputStream(log, append).getChannel(); + int bytesWritten = channel.write(buffer); + channel.close(); + buffer.clear(); + parentCache.processDirtyMessage(fileName); + long endTime = Time.monotonicNow(); + if (parentCache.isTraceEnabled()) { + parentCache.getTracer().info( + "Task=DirtyBlockLogWrite,Time={} bytesWritten={}", + endTime - startTime, bytesWritten); + } + + parentCache.getTargetMetrics().incNumBlockBufferFlushCompleted(); + parentCache.getTargetMetrics().incNumBytesDirtyLogWritten(bytesWritten); + parentCache.getTargetMetrics(). + updateBlockBufferFlushLatency(endTime - startTime); + LOG.debug("Block buffer writer bytesWritten:{} Time:{}", + bytesWritten, endTime - startTime); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java new file mode 100644 index 0000000000..7e3aed1f73 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/BlockBufferManager.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.cblock.jscsiHelper.cache.impl; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; + +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_CACHE_THREAD_PRIORITY; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT; + +/** + * This class manages the block ID buffer. + * Block ID Buffer keeps a list of blocks which are in leveldb cache + * This buffer is used later when the blocks are flushed to container + * + * Two blockIDBuffers are maintained so that write are not blocked when + * DirtyLog is being written. Once a blockIDBuffer is full, it will be + * enqueued for DirtyLog write while the other buffer accepts new write. + * Once the DirtyLog write is done, the buffer is returned back to the pool. + * + * There are three triggers for blockIDBuffer flush + * 1) BlockIDBuffer is full, + * 2) Time period defined for blockIDBuffer flush has elapsed. + * 3) Shutdown + */ +public class BlockBufferManager { + private static final Logger LOG = + LoggerFactory.getLogger(BlockBufferManager.class); + + private enum FlushReason { + BUFFER_FULL, + SHUTDOWN, + TIMER + }; + + private final int blockBufferSize; + private final CBlockLocalCache parentCache; + private final ScheduledThreadPoolExecutor scheduledExecutor; + private final ThreadPoolExecutor threadPoolExecutor; + private final int intervalSeconds; + private final ArrayBlockingQueue acquireQueue; + private final ArrayBlockingQueue workQueue; + private ByteBuffer currentBuffer; + + BlockBufferManager(Configuration config, CBlockLocalCache parentCache) { + this.parentCache = parentCache; + this.scheduledExecutor = new ScheduledThreadPoolExecutor(1); + + this.intervalSeconds = + config.getInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, + DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS_DEFAULT); + + long keepAlive = config.getLong(DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS, + DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT); + this.workQueue = new ArrayBlockingQueue<>(2, true); + int threadPri = config.getInt(DFS_CBLOCK_CACHE_THREAD_PRIORITY, + DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT); + ThreadFactory workerThreadFactory = new ThreadFactoryBuilder() + .setNameFormat("Cache Block Buffer Manager Thread #%d") + .setDaemon(true) + .setPriority(threadPri) + .build(); + /* + * starting a thread pool with core pool size of 1 and maximum of 2 threads + * as there are maximum of 2 buffers which can be flushed at the same time. + */ + this.threadPoolExecutor = new ThreadPoolExecutor(1, 2, + keepAlive, TimeUnit.SECONDS, workQueue, workerThreadFactory, + new ThreadPoolExecutor.AbortPolicy()); + + this.blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, + DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE); + this.acquireQueue = new ArrayBlockingQueue<>(2, true); + + for (int i = 0; i < 2; i++) { + acquireQueue.add(ByteBuffer.allocate(blockBufferSize)); + } + // get the first buffer to be used + this.currentBuffer = acquireQueue.remove(); + + LOG.info("BufferManager: Buffer Size:{} FlushIntervalSeconds:{}", + blockBufferSize, intervalSeconds); + } + + // triggerBlockBufferFlush enqueues current ByteBuffer for flush and returns. + // This enqueue is asynchronous and hence triggerBlockBufferFlush will + // only block when there are no available buffers in acquireQueue + // Once the DirtyLog write is done, buffer is returned back to + // BlockBufferManager using releaseBuffer + private synchronized void triggerBlockBufferFlush(FlushReason reason) { + LOG.debug("Flush triggered because: " + reason.toString() + + " Num entries in buffer: " + + currentBuffer.position() / (Long.SIZE / Byte.SIZE) + + " Acquire Queue Size: " + acquireQueue.size()); + + parentCache.getTargetMetrics().incNumBlockBufferFlushTriggered(); + BlockBufferFlushTask flushTask = + new BlockBufferFlushTask(currentBuffer, parentCache, this); + threadPoolExecutor.submit(flushTask); + try { + currentBuffer = acquireQueue.take(); + } catch (InterruptedException ex) { + currentBuffer = null; + parentCache.getTargetMetrics().incNumInterruptedBufferWaits(); + LOG.error("wait on take operation on acquire queue interrupted", ex); + Thread.currentThread().interrupt(); + } + } + + public synchronized void addToBlockBuffer(long blockId) { + parentCache.getTargetMetrics().incNumBlockBufferUpdates(); + currentBuffer.putLong(blockId); + // if no space left, flush this buffer + if (currentBuffer.remaining() == 0) { + triggerBlockBufferFlush(FlushReason.BUFFER_FULL); + } + } + + public void releaseBuffer(ByteBuffer buffer) { + if (buffer.position() != 0) { + LOG.error("requeuing a non empty buffer with:{}", + "elements enqueued in the acquire queue", + buffer.position() / (Long.SIZE / Byte.SIZE)); + buffer.reset(); + } + // There should always be space in the queue to add an element + acquireQueue.add(buffer); + } + + // Start a scheduled task to flush blockIDBuffer + public void start() { + Runnable scheduledTask = () -> triggerBlockBufferFlush(FlushReason.TIMER); + scheduledExecutor.scheduleWithFixedDelay(scheduledTask, intervalSeconds, + intervalSeconds, TimeUnit.SECONDS); + threadPoolExecutor.prestartAllCoreThreads(); + } + + public void shutdown() { + triggerBlockBufferFlush(FlushReason.SHUTDOWN); + scheduledExecutor.shutdown(); + threadPoolExecutor.shutdown(); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java new file mode 100644 index 0000000000..b488cbd70d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestBufferManager.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock; + +import com.google.common.primitives.Longs; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics; +import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher; +import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.XceiverClientSpi; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.TimeoutException; + + +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_DISK_CACHE_PATH_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_TRACE_IO; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO; +import static org.apache.hadoop.cblock.CBlockConfigKeys. + DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS; + +/** + * Tests for Tests for local cache. + */ +public class TestBufferManager { + private final static long GB = 1024 * 1024 * 1024; + private final static int KB = 1024; + private static MiniOzoneCluster cluster; + private static OzoneConfiguration config; + private static StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + private static XceiverClientManager xceiverClientManager; + + @BeforeClass + public static void init() throws IOException { + config = new OzoneConfiguration(); + File p = GenericTestUtils.getTestDir(); + String path = p.getPath().concat( + TestOzoneContainer.class.getSimpleName()); + config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + config.setBoolean(DFS_CBLOCK_TRACE_IO, true); + config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + cluster = new MiniOzoneCluster.Builder(config) + .numDataNodes(1).setHandlerType("distributed").build(); + storageContainerLocationClient = cluster + .createStorageContainerLocationClient(); + xceiverClientManager = new XceiverClientManager(config); + } + + @AfterClass + public static void shutdown() throws InterruptedException { + if (cluster != null) { + cluster.shutdown(); + } + IOUtils.cleanup(null, storageContainerLocationClient, cluster); + } + + /** + * createContainerAndGetPipeline creates a set of containers and returns the + * Pipelines that define those containers. + * + * @param count - Number of containers to create. + * @return - List of Pipelines. + * @throws IOException + */ + private List createContainerAndGetPipeline(int count) + throws IOException { + List containerPipelines = new LinkedList<>(); + for (int x = 0; x < count; x++) { + String traceID = "trace" + RandomStringUtils.randomNumeric(4); + String containerName = "container" + RandomStringUtils.randomNumeric(10); + Pipeline pipeline = + storageContainerLocationClient.allocateContainer(containerName); + XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); + ContainerProtocolCalls.createContainer(client, traceID); + // This step is needed since we set private data on pipelines, when we + // read the list from CBlockServer. So we mimic that action here. + pipeline.setData(Longs.toByteArray(x)); + containerPipelines.add(pipeline); + } + return containerPipelines; + } + + /** + * This test writes some block to the cache and then shuts down the cache. + * The cache is then restarted to check that the + * correct number of blocks are read from Dirty Log + * + * @throws IOException + */ + @Test + public void testEmptyBlockBufferHandling() throws IOException, + InterruptedException, TimeoutException { + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + URL p = flushTestConfig.getClass().getResource(""); + String path = p.getPath().concat( + TestOzoneContainer.class.getSimpleName() + + GenericTestUtils.getMethodName() + + RandomStringUtils.randomNumeric(4)); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + String data = RandomStringUtils.random(4 * KB); + List pipelines = createContainerAndGetPipeline(10); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(pipelines) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + // Write data to the cache + cache.put(1, data.getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); + Assert.assertEquals(1, metrics.getNumWriteOps()); + cache.put(2, data.getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); + Assert.assertEquals(2, metrics.getNumWriteOps()); + + // Store the previous block buffer position + Assert.assertEquals(2, metrics.getNumBlockBufferUpdates()); + // Simulate a shutdown by closing the cache + cache.close(); + Thread.sleep(1000); + Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); + Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); + Assert.assertEquals(2 * (Long.SIZE/ Byte.SIZE), + metrics.getNumBytesDirtyLogWritten()); + Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes()); + Assert.assertEquals(0, metrics.getNumInterruptedBufferWaits()); + + // Restart cache and check that right number of entries are read + CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher newFlusher = + new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, newMetrics); + CBlockLocalCache newCache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(pipelines) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(newFlusher) + .setCBlockTargetMetrics(newMetrics) + .build(); + newCache.start(); + Thread fllushListenerThread = new Thread(newFlusher); + fllushListenerThread.setDaemon(true); + fllushListenerThread.start(); + + Thread.sleep(5000); + Assert.assertEquals(metrics.getNumBlockBufferUpdates(), + newMetrics.getNumDirtyLogBlockRead()); + Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead() + * (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads()); + // Now shutdown again, nothing should be flushed + newFlusher.shutdown(); + Assert.assertEquals(0, newMetrics.getNumBlockBufferUpdates()); + Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten()); + } + + @Test + public void testPeriodicFlush() throws IOException, + InterruptedException, TimeoutException{ + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + URL p = flushTestConfig.getClass().getResource(""); + String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName()); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + flushTestConfig + .setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 5); + + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(createContainerAndGetPipeline(10)) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + Thread.sleep(8000); + // Ticks will be at 5s, 10s and so on, so this count should be 1 + Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); + // Nothing pushed to cache, so nothing should be written + Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten()); + Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted()); + cache.close(); + // After close, another trigger should happen but still no data written + Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered()); + Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten()); + Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted()); + Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes()); + } + + @Test + public void testSingleBufferFlush() throws IOException, + InterruptedException, TimeoutException { + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + URL p = flushTestConfig.getClass().getResource(""); + String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName()); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + String data = RandomStringUtils.random(4 * KB); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(createContainerAndGetPipeline(10)) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + + for (int i = 0; i < 511; i++) { + cache.put(i, data.getBytes(StandardCharsets.UTF_8)); + } + // After writing 511 block no flush should happen + Assert.assertEquals(0, metrics.getNumBlockBufferFlushTriggered()); + Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted()); + + + // After one more block it should + cache.put(512, data.getBytes(StandardCharsets.UTF_8)); + Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); + Thread.sleep(1000); + Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); + cache.close(); + Assert.assertEquals(512 * (Long.SIZE / Byte.SIZE), + metrics.getNumBytesDirtyLogWritten()); + } + + @Test + public void testMultipleBuffersFlush() throws IOException, + InterruptedException, TimeoutException { + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + URL p = flushTestConfig.getClass().getResource(""); + String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName()); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + String data = RandomStringUtils.random(4 * KB); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(createContainerAndGetPipeline(10)) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + + for (int i = 0; i < 4; i++) { + for (int j = 0; j < 512; j++) { + cache.put(i * 512 + j, data.getBytes(StandardCharsets.UTF_8)); + } + // Flush should be triggered after every 512 block write + Assert.assertEquals(i + 1, metrics.getNumBlockBufferFlushTriggered()); + } + Assert.assertEquals(0, metrics.getNumIllegalDirtyLogFiles()); + Assert.assertEquals(0, metrics.getNumFailedDirtyLogFileDeletes()); + cache.close(); + Assert.assertEquals(4 * 512 * (Long.SIZE / Byte.SIZE), + metrics.getNumBytesDirtyLogWritten()); + Assert.assertEquals(5, metrics.getNumBlockBufferFlushTriggered()); + Assert.assertEquals(4, metrics.getNumBlockBufferFlushCompleted()); + } + + @Test + public void testSingleBlockFlush() throws IOException, + InterruptedException, TimeoutException{ + // Create a new config so that this tests write metafile to new location + OzoneConfiguration flushTestConfig = new OzoneConfiguration(); + URL p = flushTestConfig.getClass().getResource(""); + String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName()); + flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); + flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); + flushTestConfig + .setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 5); + + String volumeName = "volume" + RandomStringUtils.randomNumeric(4); + String userName = "user" + RandomStringUtils.randomNumeric(4); + String data = RandomStringUtils.random(4 * KB); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, + xceiverClientManager, metrics); + CBlockLocalCache cache = CBlockLocalCache.newBuilder() + .setConfiguration(flushTestConfig) + .setVolumeName(volumeName) + .setUserName(userName) + .setPipelines(createContainerAndGetPipeline(10)) + .setClientManager(xceiverClientManager) + .setBlockSize(4 * KB) + .setVolumeSize(50 * GB) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + cache.start(); + cache.put(0, data.getBytes(StandardCharsets.UTF_8)); + Thread.sleep(8000); + // Ticks will be at 5s, 10s and so on, so this count should be 1 + Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered()); + // 1 block written to cache, which should be flushed + Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten()); + Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); + cache.close(); + // After close, another trigger should happen but no data should be written + Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered()); + Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten()); + Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted()); + Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes()); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java index f15a5ed3fc..2dd72b3b18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestLocalBlockCache.java @@ -62,10 +62,6 @@ DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO; import static org.apache.hadoop.cblock.CBlockConfigKeys. DFS_CBLOCK_TRACE_IO; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT; -import static org.apache.hadoop.cblock.CBlockConfigKeys - .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE; /** * Tests for Tests for local cache. @@ -238,15 +234,12 @@ public void testCacheWriteToRemote50KBlocks() throws IOException, cache.put(blockid, data.getBytes(StandardCharsets.UTF_8)); } Assert.assertEquals(totalBlocks, metrics.getNumWriteOps()); + Assert.assertEquals(totalBlocks, metrics.getNumBlockBufferUpdates()); LOG.info("Wrote 50K blocks, waiting for replication to finish."); GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); long endTime = Time.monotonicNow(); LOG.info("Time taken for writing {} blocks is {} seconds", totalBlocks, TimeUnit.MILLISECONDS.toSeconds(endTime - startTime)); - long blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, - DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT); - Assert.assertEquals(metrics.getNumWriteOps() / blockBufferSize, - metrics.getNumBlockBufferFlush()); // TODO: Read this data back. cache.close(); } @@ -278,6 +271,7 @@ public void testCacheInvalidBlock() throws IOException { Assert.assertEquals(1, metrics.getNumReadOps()); Assert.assertEquals(1, metrics.getNumReadLostBlocks()); Assert.assertEquals(1, metrics.getNumReadCacheMiss()); + cache.close(); } @Test @@ -507,95 +501,6 @@ public void testDirectIO() throws IOException, cache.close(); } - /** - * This test writes some block to the cache and then shuts down the cache. - * The cache is then restarted to check that the - * correct number of blocks are read from Dirty Log - * - * @throws IOException - */ - @Test - public void testEmptyBlockBufferHandling() throws IOException, - InterruptedException, TimeoutException { - // Create a new config so that this tests write metafile to new location - OzoneConfiguration flushTestConfig = new OzoneConfiguration(); - URL p = flushTestConfig.getClass().getResource(""); - String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName()); - flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); - flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true); - flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true); - - String volumeName = "volume" + RandomStringUtils.randomNumeric(4); - String userName = "user" + RandomStringUtils.randomNumeric(4); - String data = RandomStringUtils.random(4 * KB); - List pipelines = getContainerPipeline(10); - - CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, metrics); - CBlockLocalCache cache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(pipelines) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(flusher) - .setCBlockTargetMetrics(metrics) - .build(); - cache.start(); - // Write data to the cache - cache.put(1, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); - Assert.assertEquals(1, metrics.getNumWriteOps()); - cache.put(2, data.getBytes(StandardCharsets.UTF_8)); - Assert.assertEquals(0, metrics.getNumDirectBlockWrites()); - Assert.assertEquals(2, metrics.getNumWriteOps()); - - // Store the previous block buffer position - Assert.assertEquals(2, metrics.getNumDirtyLogBlockUpdated()); - // Simulate a shutdown by closing the cache - GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000); - cache.close(); - Assert.assertEquals(2 * (Long.SIZE/ Byte.SIZE), - metrics.getNumBytesDirtyLogWritten()); - Assert.assertEquals(0, metrics.getNumFailedDirtyBlockFlushes()); - - // Restart cache and check that right number of entries are read - CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create(); - ContainerCacheFlusher newFlusher = - new ContainerCacheFlusher(flushTestConfig, - xceiverClientManager, newMetrics); - CBlockLocalCache newCache = CBlockLocalCache.newBuilder() - .setConfiguration(flushTestConfig) - .setVolumeName(volumeName) - .setUserName(userName) - .setPipelines(pipelines) - .setClientManager(xceiverClientManager) - .setBlockSize(4 * KB) - .setVolumeSize(50 * GB) - .setFlusher(newFlusher) - .setCBlockTargetMetrics(newMetrics) - .build(); - newCache.start(); - Thread flushListenerThread = new Thread(newFlusher); - flushListenerThread.setDaemon(true); - flushListenerThread.start(); - - Thread.sleep(5000); - Assert.assertEquals(metrics.getNumDirtyLogBlockUpdated(), - newMetrics.getNumDirtyLogBlockRead()); - Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead() - * (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads()); - // Now shutdown again, nothing should be flushed - newCache.close(); - newFlusher.shutdown(); - Assert.assertEquals(0, newMetrics.getNumDirtyLogBlockUpdated()); - Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten()); - Assert.assertEquals(0, newMetrics.getNumFailedDirtyBlockFlushes()); - } - /** * This test writes some block to the cache and then shuts down the cache * The cache is then restarted with "short.circuit.io" disable to check @@ -642,9 +547,9 @@ public void testContainerWrites() throws IOException, .setCBlockTargetMetrics(metrics) .build(); cache.start(); - Thread fllushListenerThread = new Thread(flusher); - fllushListenerThread.setDaemon(true); - fllushListenerThread.start(); + Thread flushListenerThread = new Thread(flusher); + flushListenerThread.setDaemon(true); + flushListenerThread.start(); Assert.assertTrue(cache.isShortCircuitIOEnabled()); // Write data to the cache for (int i = 0; i < 512; i++) { @@ -686,7 +591,7 @@ public void testContainerWrites() throws IOException, } Assert.assertEquals(0, newMetrics.getNumReadLostBlocks()); Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks()); - newFlusher.shutdown(); newCache.close(); + newFlusher.shutdown(); } }