HDFS-11694. Block Storage: Add Support for 2 BlockIDBuffers and also for periodic flush of BlockIDBuffer. Contributed by Mukul Kumar Singh
This commit is contained in:
parent
821740e4d3
commit
dfb0dab966
@ -132,6 +132,11 @@ public final class CBlockConfigKeys {
|
||||
"dfs.cblock.cache.block.buffer.size";
|
||||
public static final int DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT = 512;
|
||||
|
||||
public static final String DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS =
|
||||
"dfs.cblock.block.buffer.flush.interval.seconds";
|
||||
public static final int
|
||||
DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS_DEFAULT = 60;
|
||||
|
||||
// jscsi server settings
|
||||
public static final String DFS_CBLOCK_JSCSI_SERVER_ADDRESS_KEY =
|
||||
"dfs.cblock.jscsi.server.address";
|
||||
|
@ -39,14 +39,15 @@ public class CBlockTargetMetrics {
|
||||
@Metric private MutableCounterLong numWriteOps;
|
||||
@Metric private MutableCounterLong numReadCacheHits;
|
||||
@Metric private MutableCounterLong numReadCacheMiss;
|
||||
@Metric private MutableCounterLong numDirectBlockWrites;
|
||||
|
||||
// Cblock internal Metrics
|
||||
@Metric private MutableCounterLong numDirectBlockWrites;
|
||||
@Metric private MutableCounterLong numBlockBufferFlush;
|
||||
@Metric private MutableCounterLong numDirtyLogBlockRead;
|
||||
@Metric private MutableCounterLong numDirtyLogBlockUpdated;
|
||||
@Metric private MutableCounterLong numBytesDirtyLogRead;
|
||||
@Metric private MutableCounterLong numBytesDirtyLogWritten;
|
||||
@Metric private MutableCounterLong numBlockBufferFlushCompleted;
|
||||
@Metric private MutableCounterLong numBlockBufferFlushTriggered;
|
||||
@Metric private MutableCounterLong numBlockBufferUpdates;
|
||||
|
||||
// Failure Metrics
|
||||
@Metric private MutableCounterLong numReadLostBlocks;
|
||||
@ -54,7 +55,10 @@ public class CBlockTargetMetrics {
|
||||
@Metric private MutableCounterLong numWriteIOExceptionRetryBlocks;
|
||||
@Metric private MutableCounterLong numWriteGenericExceptionRetryBlocks;
|
||||
@Metric private MutableCounterLong numFailedDirectBlockWrites;
|
||||
@Metric private MutableCounterLong numFailedDirtyBlockFlushes;
|
||||
@Metric private MutableCounterLong numIllegalDirtyLogFiles;
|
||||
@Metric private MutableCounterLong numFailedDirtyLogFileDeletes;
|
||||
@Metric private MutableCounterLong numFailedBlockBufferFlushes;
|
||||
@Metric private MutableCounterLong numInterruptedBufferWaits;
|
||||
|
||||
// Latency based Metrics
|
||||
@Metric private MutableRate dbReadLatency;
|
||||
@ -114,8 +118,12 @@ public void incNumFailedReadBlocks() {
|
||||
numFailedReadBlocks.incr();
|
||||
}
|
||||
|
||||
public void incNumBlockBufferFlush() {
|
||||
numBlockBufferFlush.incr();
|
||||
public void incNumBlockBufferFlushCompleted() {
|
||||
numBlockBufferFlushCompleted.incr();
|
||||
}
|
||||
|
||||
public void incNumBlockBufferFlushTriggered() {
|
||||
numBlockBufferFlushTriggered.incr();
|
||||
}
|
||||
|
||||
public void incNumDirtyLogBlockRead() {
|
||||
@ -126,16 +134,28 @@ public void incNumBytesDirtyLogRead(int bytes) {
|
||||
numBytesDirtyLogRead.incr(bytes);
|
||||
}
|
||||
|
||||
public void incNumDirtyLogBlockUpdated() {
|
||||
numDirtyLogBlockUpdated.incr();
|
||||
public void incNumBlockBufferUpdates() {
|
||||
numBlockBufferUpdates.incr();
|
||||
}
|
||||
|
||||
public void incNumBytesDirtyLogWritten(int bytes) {
|
||||
numBytesDirtyLogWritten.incr(bytes);
|
||||
}
|
||||
|
||||
public void incNumFailedDirtyBlockFlushes() {
|
||||
numFailedDirtyBlockFlushes.incr();
|
||||
public void incNumFailedBlockBufferFlushes() {
|
||||
numFailedBlockBufferFlushes.incr();
|
||||
}
|
||||
|
||||
public void incNumInterruptedBufferWaits() {
|
||||
numInterruptedBufferWaits.incr();
|
||||
}
|
||||
|
||||
public void incNumIllegalDirtyLogFiles() {
|
||||
numIllegalDirtyLogFiles.incr();
|
||||
}
|
||||
|
||||
public void incNumFailedDirtyLogFileDeletes() {
|
||||
numFailedDirtyLogFileDeletes.incr();
|
||||
}
|
||||
|
||||
public void updateDBReadLatency(long latency) {
|
||||
@ -213,8 +233,13 @@ public long getNumWriteGenericExceptionRetryBlocks() {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumBlockBufferFlush() {
|
||||
return numBlockBufferFlush.value();
|
||||
public long getNumBlockBufferFlushCompleted() {
|
||||
return numBlockBufferFlushCompleted.value();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumBlockBufferFlushTriggered() {
|
||||
return numBlockBufferFlushTriggered.value();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@ -228,8 +253,8 @@ public long getNumBytesDirtyLogReads() {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumDirtyLogBlockUpdated() {
|
||||
return numDirtyLogBlockUpdated.value();
|
||||
public long getNumBlockBufferUpdates() {
|
||||
return numBlockBufferUpdates.value();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@ -238,7 +263,22 @@ public long getNumBytesDirtyLogWritten() {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumFailedDirtyBlockFlushes() {
|
||||
return numFailedDirtyBlockFlushes.value();
|
||||
public long getNumFailedBlockBufferFlushes() {
|
||||
return numFailedBlockBufferFlushes.value();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumInterruptedBufferWaits() {
|
||||
return numInterruptedBufferWaits.value();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumIllegalDirtyLogFiles() {
|
||||
return numIllegalDirtyLogFiles.value();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public long getNumFailedDirtyLogFileDeletes() {
|
||||
return numFailedDirtyLogFileDeletes.value();
|
||||
}
|
||||
}
|
||||
|
@ -363,6 +363,7 @@ public void run() {
|
||||
if (finishCountMap.containsKey(message.getFileName())) {
|
||||
// In theory this should never happen. But if it happened,
|
||||
// we need to know it...
|
||||
getTargetMetrics().incNumIllegalDirtyLogFiles();
|
||||
LOG.error("Adding DirtyLog file again {} current count {} new {}",
|
||||
message.getFileName(),
|
||||
finishCountMap.get(message.getFileName()).expectedCount,
|
||||
@ -516,6 +517,7 @@ public void incCount() {
|
||||
}*/
|
||||
fileDeleted.set(true);
|
||||
} catch (Exception e) {
|
||||
flusher.getTargetMetrics().incNumFailedDirtyLogFileDeletes();
|
||||
LOG.error("Error deleting dirty log file:" + filePath, e);
|
||||
}
|
||||
}
|
||||
|
@ -32,19 +32,12 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
|
||||
|
||||
/**
|
||||
* A Queue that is used to write blocks asynchronously to the container.
|
||||
*/
|
||||
@ -52,12 +45,6 @@ public class AsyncBlockWriter {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AsyncBlockWriter.class);
|
||||
|
||||
/**
|
||||
* Right now we have a single buffer and we block when we write it to
|
||||
* the file.
|
||||
*/
|
||||
private final ByteBuffer blockIDBuffer;
|
||||
|
||||
/**
|
||||
* XceiverClientManager is used to get client connections to a set of
|
||||
* machines.
|
||||
@ -80,8 +67,8 @@ public class AsyncBlockWriter {
|
||||
* The cache this writer is operating against.
|
||||
*/
|
||||
private final CBlockLocalCache parentCache;
|
||||
private final int blockBufferSize;
|
||||
private final static String DIRTY_LOG_PREFIX = "DirtyLog";
|
||||
private final BlockBufferManager blockBufferManager;
|
||||
public final static String DIRTY_LOG_PREFIX = "DirtyLog";
|
||||
private AtomicLong localIoCount;
|
||||
|
||||
/**
|
||||
@ -95,14 +82,11 @@ public AsyncBlockWriter(Configuration config, CBlockLocalCache cache) {
|
||||
Preconditions.checkNotNull(cache, "Cache cannot be null.");
|
||||
Preconditions.checkNotNull(cache.getCacheDB(), "DB cannot be null.");
|
||||
localIoCount = new AtomicLong();
|
||||
blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
|
||||
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE);
|
||||
LOG.info("Cache: Block Size: {}", blockBufferSize);
|
||||
lock = new ReentrantLock();
|
||||
notEmpty = lock.newCondition();
|
||||
parentCache = cache;
|
||||
xceiverClientManager = cache.getClientManager();
|
||||
blockIDBuffer = ByteBuffer.allocateDirect(blockBufferSize);
|
||||
blockBufferManager = new BlockBufferManager(config, parentCache);
|
||||
}
|
||||
|
||||
public void start() throws IOException {
|
||||
@ -113,6 +97,7 @@ public void start() throws IOException {
|
||||
throw new IllegalStateException("Cache Directory create failed, Cannot " +
|
||||
"continue. Log Dir: {}" + logDir);
|
||||
}
|
||||
blockBufferManager.start();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -179,11 +164,7 @@ public void writeBlock(LogicalBlock block) throws IOException {
|
||||
block.getBlockID(), endTime - startTime, datahash);
|
||||
}
|
||||
block.clearData();
|
||||
parentCache.getTargetMetrics().incNumDirtyLogBlockUpdated();
|
||||
blockIDBuffer.putLong(block.getBlockID());
|
||||
if (blockIDBuffer.remaining() == 0) {
|
||||
writeBlockBufferToFile(blockIDBuffer);
|
||||
}
|
||||
blockBufferManager.addToBlockBuffer(block.getBlockID());
|
||||
} else {
|
||||
Pipeline pipeline = parentCache.getPipeline(block.getBlockID());
|
||||
String containerName = pipeline.getContainerName();
|
||||
@ -215,69 +196,11 @@ public void writeBlock(LogicalBlock block) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write Block Buffer to file.
|
||||
*
|
||||
* @param blockBuffer - ByteBuffer
|
||||
* @throws IOException
|
||||
*/
|
||||
private synchronized void writeBlockBufferToFile(ByteBuffer blockBuffer)
|
||||
throws IOException {
|
||||
long startTime = Time.monotonicNow();
|
||||
boolean append = false;
|
||||
int bytesWritten = 0;
|
||||
|
||||
// If there is nothing written to blockId buffer,
|
||||
// then skip flushing of blockId buffer
|
||||
if (blockBuffer.position() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
blockBuffer.flip();
|
||||
String fileName =
|
||||
String.format("%s.%s", DIRTY_LOG_PREFIX, Time.monotonicNow());
|
||||
String log = Paths.get(parentCache.getDbPath().toString(), fileName)
|
||||
.toString();
|
||||
|
||||
try {
|
||||
FileChannel channel = new FileOutputStream(log, append).getChannel();
|
||||
bytesWritten = channel.write(blockBuffer);
|
||||
} catch (Exception ex) {
|
||||
LOG.error("Unable to sync the Block map to disk -- This might cause a " +
|
||||
"data loss or corruption", ex);
|
||||
parentCache.getTargetMetrics().incNumFailedDirtyBlockFlushes();
|
||||
throw ex;
|
||||
} finally {
|
||||
blockBuffer.clear();
|
||||
}
|
||||
|
||||
parentCache.processDirtyMessage(fileName);
|
||||
blockIDBuffer.clear();
|
||||
long endTime = Time.monotonicNow();
|
||||
if (parentCache.isTraceEnabled()) {
|
||||
parentCache.getTracer().info(
|
||||
"Task=DirtyBlockLogWrite,Time={} bytesWritten={}",
|
||||
endTime - startTime, bytesWritten);
|
||||
}
|
||||
|
||||
parentCache.getTargetMetrics().incNumBytesDirtyLogWritten(bytesWritten);
|
||||
parentCache.getTargetMetrics().incNumBlockBufferFlush();
|
||||
parentCache.getTargetMetrics()
|
||||
.updateBlockBufferFlushLatency(endTime - startTime);
|
||||
LOG.debug("Block buffer writer bytesWritten:{} Time:{}",
|
||||
bytesWritten, endTime - startTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown by writing any pending I/O to dirtylog buffer.
|
||||
*/
|
||||
public void shutdown() {
|
||||
try {
|
||||
writeBlockBufferToFile(this.blockIDBuffer);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to sync the Block map to disk -- This might cause a " +
|
||||
"data loss or corruption");
|
||||
}
|
||||
blockBufferManager.shutdown();
|
||||
}
|
||||
/**
|
||||
* Returns tracer.
|
||||
|
@ -0,0 +1,118 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
|
||||
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Paths;
|
||||
|
||||
/**
|
||||
* This task is responsible for flushing the BlockIDBuffer
|
||||
* to Dirty Log File. This Dirty Log file is used later by
|
||||
* ContainerCacheFlusher when the data is written to container
|
||||
*/
|
||||
public class BlockBufferFlushTask implements Runnable {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(BlockBufferFlushTask.class);
|
||||
private final CBlockLocalCache parentCache;
|
||||
private final BlockBufferManager bufferManager;
|
||||
private final ByteBuffer blockIDBuffer;
|
||||
|
||||
BlockBufferFlushTask(ByteBuffer blockIDBuffer, CBlockLocalCache parentCache,
|
||||
BlockBufferManager manager) {
|
||||
this.parentCache = parentCache;
|
||||
this.bufferManager = manager;
|
||||
this.blockIDBuffer = blockIDBuffer;
|
||||
}
|
||||
|
||||
/**
|
||||
* When an object implementing interface <code>Runnable</code> is used
|
||||
* to create a thread, starting the thread causes the object's
|
||||
* <code>run</code> method to be called in that separately executing
|
||||
* thread.
|
||||
* <p>
|
||||
* The general contract of the method <code>run</code> is that it may
|
||||
* take any action whatsoever.
|
||||
*
|
||||
* @see Thread#run()
|
||||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
writeBlockBufferToFile(blockIDBuffer);
|
||||
} catch (Exception e) {
|
||||
parentCache.getTargetMetrics().incNumFailedBlockBufferFlushes();
|
||||
LOG.error("Unable to sync the Block map to disk with "
|
||||
+ (blockIDBuffer.position() / Long.SIZE) + "entries "
|
||||
+ "-- NOTE: This might cause a data loss or corruption", e);
|
||||
} finally {
|
||||
bufferManager.releaseBuffer(blockIDBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write Block Buffer to file.
|
||||
*
|
||||
* @param buffer - ByteBuffer
|
||||
* @throws IOException
|
||||
*/
|
||||
private void writeBlockBufferToFile(ByteBuffer buffer)
|
||||
throws IOException {
|
||||
long startTime = Time.monotonicNow();
|
||||
boolean append = false;
|
||||
|
||||
// If there is nothing written to blockId buffer,
|
||||
// then skip flushing of blockId buffer
|
||||
if (buffer.position() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
buffer.flip();
|
||||
String fileName =
|
||||
String.format("%s.%s", AsyncBlockWriter.DIRTY_LOG_PREFIX,
|
||||
Time.monotonicNow());
|
||||
String log = Paths.get(parentCache.getDbPath().toString(), fileName)
|
||||
.toString();
|
||||
|
||||
FileChannel channel = new FileOutputStream(log, append).getChannel();
|
||||
int bytesWritten = channel.write(buffer);
|
||||
channel.close();
|
||||
buffer.clear();
|
||||
parentCache.processDirtyMessage(fileName);
|
||||
long endTime = Time.monotonicNow();
|
||||
if (parentCache.isTraceEnabled()) {
|
||||
parentCache.getTracer().info(
|
||||
"Task=DirtyBlockLogWrite,Time={} bytesWritten={}",
|
||||
endTime - startTime, bytesWritten);
|
||||
}
|
||||
|
||||
parentCache.getTargetMetrics().incNumBlockBufferFlushCompleted();
|
||||
parentCache.getTargetMetrics().incNumBytesDirtyLogWritten(bytesWritten);
|
||||
parentCache.getTargetMetrics().
|
||||
updateBlockBufferFlushLatency(endTime - startTime);
|
||||
LOG.debug("Block buffer writer bytesWritten:{} Time:{}",
|
||||
bytesWritten, endTime - startTime);
|
||||
}
|
||||
}
|
@ -0,0 +1,182 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.cblock.jscsiHelper.cache.impl;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||
DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||
DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS_DEFAULT;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||
DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||
DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||
DFS_CBLOCK_CACHE_THREAD_PRIORITY;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||
DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT;
|
||||
|
||||
/**
|
||||
* This class manages the block ID buffer.
|
||||
* Block ID Buffer keeps a list of blocks which are in leveldb cache
|
||||
* This buffer is used later when the blocks are flushed to container
|
||||
*
|
||||
* Two blockIDBuffers are maintained so that write are not blocked when
|
||||
* DirtyLog is being written. Once a blockIDBuffer is full, it will be
|
||||
* enqueued for DirtyLog write while the other buffer accepts new write.
|
||||
* Once the DirtyLog write is done, the buffer is returned back to the pool.
|
||||
*
|
||||
* There are three triggers for blockIDBuffer flush
|
||||
* 1) BlockIDBuffer is full,
|
||||
* 2) Time period defined for blockIDBuffer flush has elapsed.
|
||||
* 3) Shutdown
|
||||
*/
|
||||
public class BlockBufferManager {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(BlockBufferManager.class);
|
||||
|
||||
private enum FlushReason {
|
||||
BUFFER_FULL,
|
||||
SHUTDOWN,
|
||||
TIMER
|
||||
};
|
||||
|
||||
private final int blockBufferSize;
|
||||
private final CBlockLocalCache parentCache;
|
||||
private final ScheduledThreadPoolExecutor scheduledExecutor;
|
||||
private final ThreadPoolExecutor threadPoolExecutor;
|
||||
private final int intervalSeconds;
|
||||
private final ArrayBlockingQueue<ByteBuffer> acquireQueue;
|
||||
private final ArrayBlockingQueue<Runnable> workQueue;
|
||||
private ByteBuffer currentBuffer;
|
||||
|
||||
BlockBufferManager(Configuration config, CBlockLocalCache parentCache) {
|
||||
this.parentCache = parentCache;
|
||||
this.scheduledExecutor = new ScheduledThreadPoolExecutor(1);
|
||||
|
||||
this.intervalSeconds =
|
||||
config.getInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS,
|
||||
DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS_DEFAULT);
|
||||
|
||||
long keepAlive = config.getLong(DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS,
|
||||
DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT);
|
||||
this.workQueue = new ArrayBlockingQueue<>(2, true);
|
||||
int threadPri = config.getInt(DFS_CBLOCK_CACHE_THREAD_PRIORITY,
|
||||
DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT);
|
||||
ThreadFactory workerThreadFactory = new ThreadFactoryBuilder()
|
||||
.setNameFormat("Cache Block Buffer Manager Thread #%d")
|
||||
.setDaemon(true)
|
||||
.setPriority(threadPri)
|
||||
.build();
|
||||
/*
|
||||
* starting a thread pool with core pool size of 1 and maximum of 2 threads
|
||||
* as there are maximum of 2 buffers which can be flushed at the same time.
|
||||
*/
|
||||
this.threadPoolExecutor = new ThreadPoolExecutor(1, 2,
|
||||
keepAlive, TimeUnit.SECONDS, workQueue, workerThreadFactory,
|
||||
new ThreadPoolExecutor.AbortPolicy());
|
||||
|
||||
this.blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
|
||||
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * (Long.SIZE / Byte.SIZE);
|
||||
this.acquireQueue = new ArrayBlockingQueue<>(2, true);
|
||||
|
||||
for (int i = 0; i < 2; i++) {
|
||||
acquireQueue.add(ByteBuffer.allocate(blockBufferSize));
|
||||
}
|
||||
// get the first buffer to be used
|
||||
this.currentBuffer = acquireQueue.remove();
|
||||
|
||||
LOG.info("BufferManager: Buffer Size:{} FlushIntervalSeconds:{}",
|
||||
blockBufferSize, intervalSeconds);
|
||||
}
|
||||
|
||||
// triggerBlockBufferFlush enqueues current ByteBuffer for flush and returns.
|
||||
// This enqueue is asynchronous and hence triggerBlockBufferFlush will
|
||||
// only block when there are no available buffers in acquireQueue
|
||||
// Once the DirtyLog write is done, buffer is returned back to
|
||||
// BlockBufferManager using releaseBuffer
|
||||
private synchronized void triggerBlockBufferFlush(FlushReason reason) {
|
||||
LOG.debug("Flush triggered because: " + reason.toString() +
|
||||
" Num entries in buffer: " +
|
||||
currentBuffer.position() / (Long.SIZE / Byte.SIZE) +
|
||||
" Acquire Queue Size: " + acquireQueue.size());
|
||||
|
||||
parentCache.getTargetMetrics().incNumBlockBufferFlushTriggered();
|
||||
BlockBufferFlushTask flushTask =
|
||||
new BlockBufferFlushTask(currentBuffer, parentCache, this);
|
||||
threadPoolExecutor.submit(flushTask);
|
||||
try {
|
||||
currentBuffer = acquireQueue.take();
|
||||
} catch (InterruptedException ex) {
|
||||
currentBuffer = null;
|
||||
parentCache.getTargetMetrics().incNumInterruptedBufferWaits();
|
||||
LOG.error("wait on take operation on acquire queue interrupted", ex);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void addToBlockBuffer(long blockId) {
|
||||
parentCache.getTargetMetrics().incNumBlockBufferUpdates();
|
||||
currentBuffer.putLong(blockId);
|
||||
// if no space left, flush this buffer
|
||||
if (currentBuffer.remaining() == 0) {
|
||||
triggerBlockBufferFlush(FlushReason.BUFFER_FULL);
|
||||
}
|
||||
}
|
||||
|
||||
public void releaseBuffer(ByteBuffer buffer) {
|
||||
if (buffer.position() != 0) {
|
||||
LOG.error("requeuing a non empty buffer with:{}",
|
||||
"elements enqueued in the acquire queue",
|
||||
buffer.position() / (Long.SIZE / Byte.SIZE));
|
||||
buffer.reset();
|
||||
}
|
||||
// There should always be space in the queue to add an element
|
||||
acquireQueue.add(buffer);
|
||||
}
|
||||
|
||||
// Start a scheduled task to flush blockIDBuffer
|
||||
public void start() {
|
||||
Runnable scheduledTask = () -> triggerBlockBufferFlush(FlushReason.TIMER);
|
||||
scheduledExecutor.scheduleWithFixedDelay(scheduledTask, intervalSeconds,
|
||||
intervalSeconds, TimeUnit.SECONDS);
|
||||
threadPoolExecutor.prestartAllCoreThreads();
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
triggerBlockBufferFlush(FlushReason.SHUTDOWN);
|
||||
scheduledExecutor.shutdown();
|
||||
threadPoolExecutor.shutdown();
|
||||
}
|
||||
}
|
@ -0,0 +1,394 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.cblock;
|
||||
|
||||
import com.google.common.primitives.Longs;
|
||||
import org.apache.commons.lang.RandomStringUtils;
|
||||
import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics;
|
||||
import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher;
|
||||
import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
|
||||
import org.apache.hadoop.scm.XceiverClientManager;
|
||||
import org.apache.hadoop.scm.XceiverClientSpi;
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||
DFS_CBLOCK_DISK_CACHE_PATH_KEY;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||
DFS_CBLOCK_TRACE_IO;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||
DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||
DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS;
|
||||
|
||||
/**
|
||||
* Tests for Tests for local cache.
|
||||
*/
|
||||
public class TestBufferManager {
|
||||
private final static long GB = 1024 * 1024 * 1024;
|
||||
private final static int KB = 1024;
|
||||
private static MiniOzoneCluster cluster;
|
||||
private static OzoneConfiguration config;
|
||||
private static StorageContainerLocationProtocolClientSideTranslatorPB
|
||||
storageContainerLocationClient;
|
||||
private static XceiverClientManager xceiverClientManager;
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws IOException {
|
||||
config = new OzoneConfiguration();
|
||||
File p = GenericTestUtils.getTestDir();
|
||||
String path = p.getPath().concat(
|
||||
TestOzoneContainer.class.getSimpleName());
|
||||
config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
|
||||
config.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
||||
config.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
|
||||
cluster = new MiniOzoneCluster.Builder(config)
|
||||
.numDataNodes(1).setHandlerType("distributed").build();
|
||||
storageContainerLocationClient = cluster
|
||||
.createStorageContainerLocationClient();
|
||||
xceiverClientManager = new XceiverClientManager(config);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdown() throws InterruptedException {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
IOUtils.cleanup(null, storageContainerLocationClient, cluster);
|
||||
}
|
||||
|
||||
/**
|
||||
* createContainerAndGetPipeline creates a set of containers and returns the
|
||||
* Pipelines that define those containers.
|
||||
*
|
||||
* @param count - Number of containers to create.
|
||||
* @return - List of Pipelines.
|
||||
* @throws IOException
|
||||
*/
|
||||
private List<Pipeline> createContainerAndGetPipeline(int count)
|
||||
throws IOException {
|
||||
List<Pipeline> containerPipelines = new LinkedList<>();
|
||||
for (int x = 0; x < count; x++) {
|
||||
String traceID = "trace" + RandomStringUtils.randomNumeric(4);
|
||||
String containerName = "container" + RandomStringUtils.randomNumeric(10);
|
||||
Pipeline pipeline =
|
||||
storageContainerLocationClient.allocateContainer(containerName);
|
||||
XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
|
||||
ContainerProtocolCalls.createContainer(client, traceID);
|
||||
// This step is needed since we set private data on pipelines, when we
|
||||
// read the list from CBlockServer. So we mimic that action here.
|
||||
pipeline.setData(Longs.toByteArray(x));
|
||||
containerPipelines.add(pipeline);
|
||||
}
|
||||
return containerPipelines;
|
||||
}
|
||||
|
||||
/**
|
||||
* This test writes some block to the cache and then shuts down the cache.
|
||||
* The cache is then restarted to check that the
|
||||
* correct number of blocks are read from Dirty Log
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testEmptyBlockBufferHandling() throws IOException,
|
||||
InterruptedException, TimeoutException {
|
||||
// Create a new config so that this tests write metafile to new location
|
||||
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
|
||||
URL p = flushTestConfig.getClass().getResource("");
|
||||
String path = p.getPath().concat(
|
||||
TestOzoneContainer.class.getSimpleName() +
|
||||
GenericTestUtils.getMethodName() +
|
||||
RandomStringUtils.randomNumeric(4));
|
||||
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
|
||||
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
||||
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
|
||||
|
||||
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
|
||||
String userName = "user" + RandomStringUtils.randomNumeric(4);
|
||||
String data = RandomStringUtils.random(4 * KB);
|
||||
List<Pipeline> pipelines = createContainerAndGetPipeline(10);
|
||||
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
|
||||
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
|
||||
xceiverClientManager, metrics);
|
||||
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
|
||||
.setConfiguration(flushTestConfig)
|
||||
.setVolumeName(volumeName)
|
||||
.setUserName(userName)
|
||||
.setPipelines(pipelines)
|
||||
.setClientManager(xceiverClientManager)
|
||||
.setBlockSize(4 * KB)
|
||||
.setVolumeSize(50 * GB)
|
||||
.setFlusher(flusher)
|
||||
.setCBlockTargetMetrics(metrics)
|
||||
.build();
|
||||
cache.start();
|
||||
// Write data to the cache
|
||||
cache.put(1, data.getBytes(StandardCharsets.UTF_8));
|
||||
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
|
||||
Assert.assertEquals(1, metrics.getNumWriteOps());
|
||||
cache.put(2, data.getBytes(StandardCharsets.UTF_8));
|
||||
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
|
||||
Assert.assertEquals(2, metrics.getNumWriteOps());
|
||||
|
||||
// Store the previous block buffer position
|
||||
Assert.assertEquals(2, metrics.getNumBlockBufferUpdates());
|
||||
// Simulate a shutdown by closing the cache
|
||||
cache.close();
|
||||
Thread.sleep(1000);
|
||||
Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
|
||||
Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
|
||||
Assert.assertEquals(2 * (Long.SIZE/ Byte.SIZE),
|
||||
metrics.getNumBytesDirtyLogWritten());
|
||||
Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
|
||||
Assert.assertEquals(0, metrics.getNumInterruptedBufferWaits());
|
||||
|
||||
// Restart cache and check that right number of entries are read
|
||||
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
|
||||
ContainerCacheFlusher newFlusher =
|
||||
new ContainerCacheFlusher(flushTestConfig,
|
||||
xceiverClientManager, newMetrics);
|
||||
CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
|
||||
.setConfiguration(flushTestConfig)
|
||||
.setVolumeName(volumeName)
|
||||
.setUserName(userName)
|
||||
.setPipelines(pipelines)
|
||||
.setClientManager(xceiverClientManager)
|
||||
.setBlockSize(4 * KB)
|
||||
.setVolumeSize(50 * GB)
|
||||
.setFlusher(newFlusher)
|
||||
.setCBlockTargetMetrics(newMetrics)
|
||||
.build();
|
||||
newCache.start();
|
||||
Thread fllushListenerThread = new Thread(newFlusher);
|
||||
fllushListenerThread.setDaemon(true);
|
||||
fllushListenerThread.start();
|
||||
|
||||
Thread.sleep(5000);
|
||||
Assert.assertEquals(metrics.getNumBlockBufferUpdates(),
|
||||
newMetrics.getNumDirtyLogBlockRead());
|
||||
Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead()
|
||||
* (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads());
|
||||
// Now shutdown again, nothing should be flushed
|
||||
newFlusher.shutdown();
|
||||
Assert.assertEquals(0, newMetrics.getNumBlockBufferUpdates());
|
||||
Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPeriodicFlush() throws IOException,
|
||||
InterruptedException, TimeoutException{
|
||||
// Create a new config so that this tests write metafile to new location
|
||||
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
|
||||
URL p = flushTestConfig.getClass().getResource("");
|
||||
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
|
||||
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
|
||||
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
||||
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
|
||||
flushTestConfig
|
||||
.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 5);
|
||||
|
||||
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
|
||||
String userName = "user" + RandomStringUtils.randomNumeric(4);
|
||||
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
|
||||
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
|
||||
xceiverClientManager, metrics);
|
||||
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
|
||||
.setConfiguration(flushTestConfig)
|
||||
.setVolumeName(volumeName)
|
||||
.setUserName(userName)
|
||||
.setPipelines(createContainerAndGetPipeline(10))
|
||||
.setClientManager(xceiverClientManager)
|
||||
.setBlockSize(4 * KB)
|
||||
.setVolumeSize(50 * GB)
|
||||
.setFlusher(flusher)
|
||||
.setCBlockTargetMetrics(metrics)
|
||||
.build();
|
||||
cache.start();
|
||||
Thread.sleep(8000);
|
||||
// Ticks will be at 5s, 10s and so on, so this count should be 1
|
||||
Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
|
||||
// Nothing pushed to cache, so nothing should be written
|
||||
Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten());
|
||||
Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted());
|
||||
cache.close();
|
||||
// After close, another trigger should happen but still no data written
|
||||
Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered());
|
||||
Assert.assertEquals(0, metrics.getNumBytesDirtyLogWritten());
|
||||
Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted());
|
||||
Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleBufferFlush() throws IOException,
|
||||
InterruptedException, TimeoutException {
|
||||
// Create a new config so that this tests write metafile to new location
|
||||
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
|
||||
URL p = flushTestConfig.getClass().getResource("");
|
||||
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
|
||||
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
|
||||
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
||||
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
|
||||
|
||||
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
|
||||
String userName = "user" + RandomStringUtils.randomNumeric(4);
|
||||
String data = RandomStringUtils.random(4 * KB);
|
||||
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
|
||||
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
|
||||
xceiverClientManager, metrics);
|
||||
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
|
||||
.setConfiguration(flushTestConfig)
|
||||
.setVolumeName(volumeName)
|
||||
.setUserName(userName)
|
||||
.setPipelines(createContainerAndGetPipeline(10))
|
||||
.setClientManager(xceiverClientManager)
|
||||
.setBlockSize(4 * KB)
|
||||
.setVolumeSize(50 * GB)
|
||||
.setFlusher(flusher)
|
||||
.setCBlockTargetMetrics(metrics)
|
||||
.build();
|
||||
cache.start();
|
||||
|
||||
for (int i = 0; i < 511; i++) {
|
||||
cache.put(i, data.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
// After writing 511 block no flush should happen
|
||||
Assert.assertEquals(0, metrics.getNumBlockBufferFlushTriggered());
|
||||
Assert.assertEquals(0, metrics.getNumBlockBufferFlushCompleted());
|
||||
|
||||
|
||||
// After one more block it should
|
||||
cache.put(512, data.getBytes(StandardCharsets.UTF_8));
|
||||
Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
|
||||
Thread.sleep(1000);
|
||||
Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
|
||||
cache.close();
|
||||
Assert.assertEquals(512 * (Long.SIZE / Byte.SIZE),
|
||||
metrics.getNumBytesDirtyLogWritten());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleBuffersFlush() throws IOException,
|
||||
InterruptedException, TimeoutException {
|
||||
// Create a new config so that this tests write metafile to new location
|
||||
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
|
||||
URL p = flushTestConfig.getClass().getResource("");
|
||||
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
|
||||
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
|
||||
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
||||
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
|
||||
|
||||
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
|
||||
String userName = "user" + RandomStringUtils.randomNumeric(4);
|
||||
String data = RandomStringUtils.random(4 * KB);
|
||||
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
|
||||
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
|
||||
xceiverClientManager, metrics);
|
||||
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
|
||||
.setConfiguration(flushTestConfig)
|
||||
.setVolumeName(volumeName)
|
||||
.setUserName(userName)
|
||||
.setPipelines(createContainerAndGetPipeline(10))
|
||||
.setClientManager(xceiverClientManager)
|
||||
.setBlockSize(4 * KB)
|
||||
.setVolumeSize(50 * GB)
|
||||
.setFlusher(flusher)
|
||||
.setCBlockTargetMetrics(metrics)
|
||||
.build();
|
||||
cache.start();
|
||||
|
||||
for (int i = 0; i < 4; i++) {
|
||||
for (int j = 0; j < 512; j++) {
|
||||
cache.put(i * 512 + j, data.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
// Flush should be triggered after every 512 block write
|
||||
Assert.assertEquals(i + 1, metrics.getNumBlockBufferFlushTriggered());
|
||||
}
|
||||
Assert.assertEquals(0, metrics.getNumIllegalDirtyLogFiles());
|
||||
Assert.assertEquals(0, metrics.getNumFailedDirtyLogFileDeletes());
|
||||
cache.close();
|
||||
Assert.assertEquals(4 * 512 * (Long.SIZE / Byte.SIZE),
|
||||
metrics.getNumBytesDirtyLogWritten());
|
||||
Assert.assertEquals(5, metrics.getNumBlockBufferFlushTriggered());
|
||||
Assert.assertEquals(4, metrics.getNumBlockBufferFlushCompleted());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleBlockFlush() throws IOException,
|
||||
InterruptedException, TimeoutException{
|
||||
// Create a new config so that this tests write metafile to new location
|
||||
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
|
||||
URL p = flushTestConfig.getClass().getResource("");
|
||||
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
|
||||
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
|
||||
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
||||
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
|
||||
flushTestConfig
|
||||
.setInt(DFS_CBLOCK_BLOCK_BUFFER_FLUSH_INTERVAL_SECONDS, 5);
|
||||
|
||||
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
|
||||
String userName = "user" + RandomStringUtils.randomNumeric(4);
|
||||
String data = RandomStringUtils.random(4 * KB);
|
||||
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
|
||||
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
|
||||
xceiverClientManager, metrics);
|
||||
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
|
||||
.setConfiguration(flushTestConfig)
|
||||
.setVolumeName(volumeName)
|
||||
.setUserName(userName)
|
||||
.setPipelines(createContainerAndGetPipeline(10))
|
||||
.setClientManager(xceiverClientManager)
|
||||
.setBlockSize(4 * KB)
|
||||
.setVolumeSize(50 * GB)
|
||||
.setFlusher(flusher)
|
||||
.setCBlockTargetMetrics(metrics)
|
||||
.build();
|
||||
cache.start();
|
||||
cache.put(0, data.getBytes(StandardCharsets.UTF_8));
|
||||
Thread.sleep(8000);
|
||||
// Ticks will be at 5s, 10s and so on, so this count should be 1
|
||||
Assert.assertEquals(1, metrics.getNumBlockBufferFlushTriggered());
|
||||
// 1 block written to cache, which should be flushed
|
||||
Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten());
|
||||
Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
|
||||
cache.close();
|
||||
// After close, another trigger should happen but no data should be written
|
||||
Assert.assertEquals(2, metrics.getNumBlockBufferFlushTriggered());
|
||||
Assert.assertEquals(8, metrics.getNumBytesDirtyLogWritten());
|
||||
Assert.assertEquals(1, metrics.getNumBlockBufferFlushCompleted());
|
||||
Assert.assertEquals(0, metrics.getNumFailedBlockBufferFlushes());
|
||||
}
|
||||
}
|
@ -62,10 +62,6 @@
|
||||
DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys.
|
||||
DFS_CBLOCK_TRACE_IO;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||
.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.cblock.CBlockConfigKeys
|
||||
.DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE;
|
||||
|
||||
/**
|
||||
* Tests for Tests for local cache.
|
||||
@ -238,15 +234,12 @@ public void testCacheWriteToRemote50KBlocks() throws IOException,
|
||||
cache.put(blockid, data.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
Assert.assertEquals(totalBlocks, metrics.getNumWriteOps());
|
||||
Assert.assertEquals(totalBlocks, metrics.getNumBlockBufferUpdates());
|
||||
LOG.info("Wrote 50K blocks, waiting for replication to finish.");
|
||||
GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
|
||||
long endTime = Time.monotonicNow();
|
||||
LOG.info("Time taken for writing {} blocks is {} seconds", totalBlocks,
|
||||
TimeUnit.MILLISECONDS.toSeconds(endTime - startTime));
|
||||
long blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE,
|
||||
DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT);
|
||||
Assert.assertEquals(metrics.getNumWriteOps() / blockBufferSize,
|
||||
metrics.getNumBlockBufferFlush());
|
||||
// TODO: Read this data back.
|
||||
cache.close();
|
||||
}
|
||||
@ -278,6 +271,7 @@ public void testCacheInvalidBlock() throws IOException {
|
||||
Assert.assertEquals(1, metrics.getNumReadOps());
|
||||
Assert.assertEquals(1, metrics.getNumReadLostBlocks());
|
||||
Assert.assertEquals(1, metrics.getNumReadCacheMiss());
|
||||
cache.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -507,95 +501,6 @@ public void testDirectIO() throws IOException,
|
||||
cache.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* This test writes some block to the cache and then shuts down the cache.
|
||||
* The cache is then restarted to check that the
|
||||
* correct number of blocks are read from Dirty Log
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testEmptyBlockBufferHandling() throws IOException,
|
||||
InterruptedException, TimeoutException {
|
||||
// Create a new config so that this tests write metafile to new location
|
||||
OzoneConfiguration flushTestConfig = new OzoneConfiguration();
|
||||
URL p = flushTestConfig.getClass().getResource("");
|
||||
String path = p.getPath().concat(TestOzoneContainer.class.getSimpleName());
|
||||
flushTestConfig.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path);
|
||||
flushTestConfig.setBoolean(DFS_CBLOCK_TRACE_IO, true);
|
||||
flushTestConfig.setBoolean(DFS_CBLOCK_ENABLE_SHORT_CIRCUIT_IO, true);
|
||||
|
||||
String volumeName = "volume" + RandomStringUtils.randomNumeric(4);
|
||||
String userName = "user" + RandomStringUtils.randomNumeric(4);
|
||||
String data = RandomStringUtils.random(4 * KB);
|
||||
List<Pipeline> pipelines = getContainerPipeline(10);
|
||||
|
||||
CBlockTargetMetrics metrics = CBlockTargetMetrics.create();
|
||||
ContainerCacheFlusher flusher = new ContainerCacheFlusher(flushTestConfig,
|
||||
xceiverClientManager, metrics);
|
||||
CBlockLocalCache cache = CBlockLocalCache.newBuilder()
|
||||
.setConfiguration(flushTestConfig)
|
||||
.setVolumeName(volumeName)
|
||||
.setUserName(userName)
|
||||
.setPipelines(pipelines)
|
||||
.setClientManager(xceiverClientManager)
|
||||
.setBlockSize(4 * KB)
|
||||
.setVolumeSize(50 * GB)
|
||||
.setFlusher(flusher)
|
||||
.setCBlockTargetMetrics(metrics)
|
||||
.build();
|
||||
cache.start();
|
||||
// Write data to the cache
|
||||
cache.put(1, data.getBytes(StandardCharsets.UTF_8));
|
||||
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
|
||||
Assert.assertEquals(1, metrics.getNumWriteOps());
|
||||
cache.put(2, data.getBytes(StandardCharsets.UTF_8));
|
||||
Assert.assertEquals(0, metrics.getNumDirectBlockWrites());
|
||||
Assert.assertEquals(2, metrics.getNumWriteOps());
|
||||
|
||||
// Store the previous block buffer position
|
||||
Assert.assertEquals(2, metrics.getNumDirtyLogBlockUpdated());
|
||||
// Simulate a shutdown by closing the cache
|
||||
GenericTestUtils.waitFor(() -> !cache.isDirtyCache(), 100, 20 * 1000);
|
||||
cache.close();
|
||||
Assert.assertEquals(2 * (Long.SIZE/ Byte.SIZE),
|
||||
metrics.getNumBytesDirtyLogWritten());
|
||||
Assert.assertEquals(0, metrics.getNumFailedDirtyBlockFlushes());
|
||||
|
||||
// Restart cache and check that right number of entries are read
|
||||
CBlockTargetMetrics newMetrics = CBlockTargetMetrics.create();
|
||||
ContainerCacheFlusher newFlusher =
|
||||
new ContainerCacheFlusher(flushTestConfig,
|
||||
xceiverClientManager, newMetrics);
|
||||
CBlockLocalCache newCache = CBlockLocalCache.newBuilder()
|
||||
.setConfiguration(flushTestConfig)
|
||||
.setVolumeName(volumeName)
|
||||
.setUserName(userName)
|
||||
.setPipelines(pipelines)
|
||||
.setClientManager(xceiverClientManager)
|
||||
.setBlockSize(4 * KB)
|
||||
.setVolumeSize(50 * GB)
|
||||
.setFlusher(newFlusher)
|
||||
.setCBlockTargetMetrics(newMetrics)
|
||||
.build();
|
||||
newCache.start();
|
||||
Thread flushListenerThread = new Thread(newFlusher);
|
||||
flushListenerThread.setDaemon(true);
|
||||
flushListenerThread.start();
|
||||
|
||||
Thread.sleep(5000);
|
||||
Assert.assertEquals(metrics.getNumDirtyLogBlockUpdated(),
|
||||
newMetrics.getNumDirtyLogBlockRead());
|
||||
Assert.assertEquals(newMetrics.getNumDirtyLogBlockRead()
|
||||
* (Long.SIZE/ Byte.SIZE), newMetrics.getNumBytesDirtyLogReads());
|
||||
// Now shutdown again, nothing should be flushed
|
||||
newCache.close();
|
||||
newFlusher.shutdown();
|
||||
Assert.assertEquals(0, newMetrics.getNumDirtyLogBlockUpdated());
|
||||
Assert.assertEquals(0, newMetrics.getNumBytesDirtyLogWritten());
|
||||
Assert.assertEquals(0, newMetrics.getNumFailedDirtyBlockFlushes());
|
||||
}
|
||||
|
||||
/**
|
||||
* This test writes some block to the cache and then shuts down the cache
|
||||
* The cache is then restarted with "short.circuit.io" disable to check
|
||||
@ -642,9 +547,9 @@ public void testContainerWrites() throws IOException,
|
||||
.setCBlockTargetMetrics(metrics)
|
||||
.build();
|
||||
cache.start();
|
||||
Thread fllushListenerThread = new Thread(flusher);
|
||||
fllushListenerThread.setDaemon(true);
|
||||
fllushListenerThread.start();
|
||||
Thread flushListenerThread = new Thread(flusher);
|
||||
flushListenerThread.setDaemon(true);
|
||||
flushListenerThread.start();
|
||||
Assert.assertTrue(cache.isShortCircuitIOEnabled());
|
||||
// Write data to the cache
|
||||
for (int i = 0; i < 512; i++) {
|
||||
@ -686,7 +591,7 @@ public void testContainerWrites() throws IOException,
|
||||
}
|
||||
Assert.assertEquals(0, newMetrics.getNumReadLostBlocks());
|
||||
Assert.assertEquals(0, newMetrics.getNumFailedReadBlocks());
|
||||
newFlusher.shutdown();
|
||||
newCache.close();
|
||||
newFlusher.shutdown();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user