diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java index b4c3bd953a..433c94ca59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/Pipeline.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -35,6 +36,12 @@ public class Pipeline { private String containerName; private String leaderID; private Map datanodes; + /** + * Allows you to maintain private data on pipelines. + * This is not serialized via protobuf, just allows us to maintain some + * private data. + */ + private byte[] data; /** * Constructs a new pipeline data structure. @@ -44,6 +51,7 @@ public class Pipeline { public Pipeline(String leaderID) { this.leaderID = leaderID; datanodes = new TreeMap<>(); + data = null; } /** @@ -124,4 +132,27 @@ public String getContainerName() { public void setContainerName(String containerName) { this.containerName = containerName; } + + /** + * Set private data on pipeline. + * @param data -- private data. + */ + public void setData(byte[] data) { + if (data != null) { + this.data = Arrays.copyOf(data, data.length); + } + } + + /** + * Returns private data that is set on this pipeline. + * + * @return blob, the user can interpret it any way they like. + */ + public byte[] getData() { + if (this.data != null) { + return Arrays.copyOf(this.data, this.data.length); + } else { + return null; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java index cafc5a971c..bc0d68c69b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java @@ -17,12 +17,12 @@ */ package org.apache.hadoop.cblock; +import static java.lang.Thread.NORM_PRIORITY; + /** * This class contains constants for configuration keys used in CBlock. */ public final class CBlockConfigKeys { - public static final String DFS_CBLOCK_ENABLED_KEY = - "dfs.cblock.enabled"; public static final String DFS_CBLOCK_SERVICERPC_ADDRESS_KEY = "dfs.cblock.servicerpc-address"; public static final String DFS_CBLOCK_SERVICERPC_PORT_KEY = @@ -33,8 +33,6 @@ public final class CBlockConfigKeys { "dfs.cblock.servicerpc.hostname"; public static final String DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT = "0.0.0.0"; - public static final String DFS_CBLOCK_RPCSERVICE_IP_DEFAULT = - "0.0.0.0"; public static final String DFS_CBLOCK_SERVICERPC_ADDRESS_DEFAULT = DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT + ":" + DFS_CBLOCK_SERVICERPC_PORT_DEFAULT; @@ -46,7 +44,7 @@ public final class CBlockConfigKeys { public static final int DFS_CBLOCK_JSCSI_PORT_DEFAULT = 9811; public static final String DFS_CBLOCK_JSCSIRPC_ADDRESS_DEFAULT = - DFS_CBLOCK_RPCSERVICE_IP_DEFAULT + DFS_CBLOCK_SERVICERPC_HOSTNAME_DEFAULT + ":" + DFS_CBLOCK_JSCSI_PORT_DEFAULT; @@ -69,6 +67,66 @@ public final class CBlockConfigKeys { public static final String DFS_CBLOCK_SERVICE_LEVELDB_PATH_DEFAULT = "/tmp/cblock_levelDB.dat"; + + public static final String DFS_CBLOCK_DISK_CACHE_PATH_KEY = + "dfs.cblock.disk.cache.path"; + public static final String DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT = + "/tmp/cblockCacheDB"; + /** + * Setting this flag to true makes the block layer compute a sha256 hash of + * the data and log that information along with block ID. This is very + * useful for doing trace based simulation of various workloads. Since it is + * computing a hash for each block this could be expensive, hence default + * is false. + */ + public static final String DFS_CBLOCK_TRACE_IO = "dfs.cblock.trace.io"; + public static final boolean DFS_CBLOCK_TRACE_IO_DEFAULT = false; + + /** + * Cache size in 1000s of entries. 256 indicates 256 * 1024. + */ + public static final String DFS_CBLOCK_CACHE_QUEUE_SIZE_KB = + "dfs.cblock.cache.cache.size.in.kb"; + public static final int DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT = 256; + + /** + * Minimum Number of threads that cache pool will use for background I/O. + */ + public static final String DFS_CBLOCK_CACHE_CORE_POOL_SIZE = + "dfs.cblock.cache.core.pool.size"; + public static final int DFS_CBLOCK_CACHE_CORE_POOL_SIZE_DEFAULT = 16; + + /** + * Maximum Number of threads that cache pool will use for background I/O. + */ + + public static final String DFS_CBLOCK_CACHE_MAX_POOL_SIZE = + "dfs.cblock.cache.max.pool.size"; + public static final int DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT = 256; + + /** + * Number of seconds to keep the Thread alive when it is idle. + */ + public static final String DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS = + "dfs.cblock.cache.keep.alive.seconds"; + public static final long DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT = 60; + + /** + * Priority of cache flusher thread, affecting the relative performance of + * write and read. + */ + public static final String DFS_CBLOCK_CACHE_THREAD_PRIORITY = + "dfs.cblock.cache.thread.priority"; + public static final int DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT = + NORM_PRIORITY; + + /** + * Block Buffer size in 1024 entries, 128 means 128 * 1024 blockIDs. + */ + public static final String DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE = + "dfs.cblock.cache.block.buffer.size"; + public static final int DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT = 128; + private CBlockConfigKeys() { } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java new file mode 100644 index 0000000000..6b3de941fe --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; +import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; +import org.apache.hadoop.scm.XceiverClientSpi; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.util.Time; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Paths; + +/** + * The blockWriter task. + */ +public class BlockWriterTask implements Runnable { + private final LogicalBlock block; + private int tryCount; + private final ContainerCacheFlusher flusher; + private final String dbPath; + private final String fileName; + private static final String RETRY_LOG_PREFIX = "RetryLog"; + + /** + * Constructs a BlockWriterTask. + * + * @param block - Block Information. + * @param flusher - ContainerCacheFlusher. + */ + public BlockWriterTask(LogicalBlock block, ContainerCacheFlusher flusher, + String dbPath, String fileName) { + this.block = block; + this.flusher = flusher; + this.dbPath = dbPath; + tryCount = 0; + this.fileName = fileName; + } + + /** + * When an object implementing interface Runnable is used + * to create a thread, starting the thread causes the object's + * run method to be called in that separately executing + * thread. + *

+ * The general contract of the method run is that it may + * take any action whatsoever. + * + * @see Thread#run() + */ + @Override + public void run() { + String containerName = null; + XceiverClientSpi client = null; + flusher.getLOG().debug( + "Writing block to remote. block ID: {}", block.getBlockID()); + try { + incTryCount(); + Pipeline pipeline = flusher.getPipeline(this.dbPath, block.getBlockID()); + client = flusher.getXceiverClientManager().acquireClient(pipeline); + byte[] keybuf = Longs.toByteArray(block.getBlockID()); + byte[] data; + long startTime = Time.monotonicNow(); + data = flusher.getCacheDB(this.dbPath).get(keybuf); + long endTime = Time.monotonicNow(); + Preconditions.checkState(data.length > 0, "Block data is zero length"); + startTime = Time.monotonicNow(); + // BUG: fix the trace ID. + ContainerProtocolCalls.writeSmallFile(client, containerName, + Long.toString(block.getBlockID()), data, ""); + endTime = Time.monotonicNow(); + flusher.getTargetMetrics().updateContainerWriteLatency( + endTime - startTime); + flusher.getLOG().debug("Time taken for Write Small File : {} ms", + endTime - startTime); + + flusher.incrementremoteIO(); + + } catch (IOException ex) { + flusher.getLOG().error("Writing of block failed, We have attempted " + + "to write this block {} times to the container {}.Trace ID:{}", + this.getTryCount(), containerName, "", ex); + writeRetryBlock(block); + } finally { + flusher.incFinishCount(fileName); + if(client != null) { + flusher.getXceiverClientManager().releaseClient(client); + } + } + } + + + private void writeRetryBlock(LogicalBlock currentBlock) { + boolean append = false; + String retryFileName = + String.format("%s.%d.%s", RETRY_LOG_PREFIX, currentBlock.getBlockID(), + Time.monotonicNow()); + File logDir = new File(this.dbPath); + if (!logDir.exists() && !logDir.mkdirs()) { + flusher.getLOG().error( + "Unable to create the log directory, Crticial error cannot continue"); + return; + } + String log = Paths.get(this.dbPath, retryFileName).toString(); + ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE); + buffer.putLong(currentBlock.getBlockID()); + try { + FileChannel channel = new FileOutputStream(log, append).getChannel(); + channel.write(buffer); + channel.close(); + flusher.processDirtyBlocks(this.dbPath, retryFileName); + } catch (IOException e) { + flusher.getLOG().error("Unable to write the retry block. Block ID: {}", + currentBlock.getBlockID(), e); + } + } + + /** + * Increments the try count. This is done each time we try this block + * write to the container. + */ + private void incTryCount() { + tryCount++; + } + + /** + * Get the retry count. + * + * @return int + */ + public int getTryCount() { + return tryCount; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java new file mode 100644 index 0000000000..2f35668889 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockIStorageImpl.java @@ -0,0 +1,440 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper; + +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.cblock.jscsiHelper.cache.CacheModule; +import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; +import org.apache.hadoop.cblock.jscsiHelper.cache.impl.CBlockLocalCache; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.jscsi.target.storage.IStorageModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_TRACE_IO_DEFAULT; + +/** + * The SCSI Target class for CBlockSCSIServer. + */ +final public class CBlockIStorageImpl implements IStorageModule { + private static final Logger LOGGER = + LoggerFactory.getLogger(CBlockIStorageImpl.class); + private static final Logger TRACER = + LoggerFactory.getLogger("TraceIO"); + + private CacheModule cache; + private final long volumeSize; + private final int blockSize; + private final String userName; + private final String volumeName; + private final boolean traceEnabled; + private final Configuration conf; + private final ContainerCacheFlusher flusher; + private List fullContainerList; + + /** + * private: constructs a SCSI Target. + * + * @param config - config + * @param userName - Username + * @param volumeName - Name of the volume + * @param volumeSize - Size of the volume + * @param blockSize - Size of the block + * @param fullContainerList - Ordered list of containers that make up this + * volume. + * @param flusher - flusher which is used to flush data from + * level db cache to containers + * @throws IOException - Throws IOException. + */ + private CBlockIStorageImpl(Configuration config, String userName, + String volumeName, long volumeSize, int blockSize, + List fullContainerList, ContainerCacheFlusher flusher) { + this.conf = config; + this.userName = userName; + this.volumeName = volumeName; + this.volumeSize = volumeSize; + this.blockSize = blockSize; + this.fullContainerList = new ArrayList<>(fullContainerList); + this.flusher = flusher; + this.traceEnabled = conf.getBoolean(DFS_CBLOCK_TRACE_IO, + DFS_CBLOCK_TRACE_IO_DEFAULT); + } + + /** + * private: initialize the cache. + * + * @param xceiverClientManager - client manager that is used for creating new + * connections to containers. + * @param metrics - target metrics to maintain metrics for target server + * @throws IOException - Throws IOException. + */ + private void initCache(XceiverClientManager xceiverClientManager, + CBlockTargetMetrics metrics) throws IOException { + this.cache = CBlockLocalCache.newBuilder() + .setConfiguration(conf) + .setVolumeName(this.volumeName) + .setUserName(this.userName) + .setPipelines(this.fullContainerList) + .setClientManager(xceiverClientManager) + .setBlockSize(blockSize) + .setVolumeSize(volumeSize) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + this.cache.start(); + } + + /** + * Gets a new builder for CBlockStorageImpl. + * + * @return builder + */ + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Get Cache. + * + * @return - Cache + */ + public CacheModule getCache() { + return cache; + } + + /** + * Returns block size of this volume. + * + * @return int size of block for this volume. + */ + @Override + public int getBlockSize() { + return blockSize; + } + + /** + * Checks the index boundary of a block address. + * + * @param logicalBlockAddress the index of the first block of data to be read + * or written + * @param transferLengthInBlocks the total number of consecutive blocks about + * to be read or written + * @return 0 == Success, 1 indicates the LBA address is out of bounds and 2 + * indicates that LBA + transfer size is out of bounds. + */ + @Override + public int checkBounds(long logicalBlockAddress, int transferLengthInBlocks) { + long sizeInBlocks = volumeSize / blockSize; + int res = 0; + if (logicalBlockAddress < 0 || logicalBlockAddress >= sizeInBlocks) { + res = 1; + } + + if (transferLengthInBlocks < 0 || + logicalBlockAddress + transferLengthInBlocks > sizeInBlocks) { + if (res == 0) { + res = 2; + } + } + return res; + } + + /** + * Number of blocks that make up this volume. + * + * @return long - count of blocks. + */ + @Override + public long getSizeInBlocks() { + return volumeSize / blockSize; + } + + /** + * Reads the number of bytes that can be read into the bytes buffer from the + * location indicated. + * + * @param bytes the array into which the data will be copied will be filled + * with data from storage + * @param storageIndex the position of the first byte to be copied + * @throws IOException + */ + @Override + public void read(byte[] bytes, long storageIndex) throws IOException { + int startingIdxInBlock = (int) storageIndex % blockSize; + int idxInBytes = 0; + if (this.traceEnabled) { + TRACER.info("Task=ReadStart,length={},location={}", + bytes.length, storageIndex); + } + while (idxInBytes < bytes.length - 1) { + long blockId = (storageIndex + idxInBytes) / blockSize; + byte[] dataBytes; + + try { + LogicalBlock block = this.cache.get(blockId); + dataBytes = block.getData().array(); + + if (this.traceEnabled) { + TRACER.info("Task=ReadBlock,BlockID={},length={},SHA={}", + blockId, + dataBytes.length, + dataBytes.length > 0 ? DigestUtils.sha256Hex(dataBytes) : null); + } + } catch (IOException e) { + // For an non-existing block cache.get will return a block with zero + // bytes filled. So any error here is a real error. + LOGGER.error("getting errors when reading data:" + e); + throw e; + } + + int length = blockSize - startingIdxInBlock; + if (length > bytes.length - idxInBytes) { + length = bytes.length - idxInBytes; + } + if (dataBytes.length >= length) { + System.arraycopy(dataBytes, startingIdxInBlock, bytes, idxInBytes, + length); + } + startingIdxInBlock = 0; + idxInBytes += length; + } + if (this.traceEnabled) { + TRACER.info("Task=ReadEnd,length={},location={},SHA={}", + bytes.length, storageIndex, DigestUtils.sha256Hex(bytes)); + } + } + + @Override + public void write(byte[] bytes, long storageIndex) throws IOException { + int startingIdxInBlock = (int) storageIndex % blockSize; + int idxInBytes = 0; + if (this.traceEnabled) { + TRACER.info("Task=WriteStart,length={},location={},SHA={}", + bytes.length, storageIndex, + bytes.length > 0 ? DigestUtils.sha256Hex(bytes) : null); + } + + ByteBuffer dataByte = ByteBuffer.allocate(blockSize); + while (idxInBytes < bytes.length - 1) { + long blockId = (storageIndex + idxInBytes) / blockSize; + int length = blockSize - startingIdxInBlock; + if (length > bytes.length - idxInBytes) { + length = bytes.length - idxInBytes; + } + System.arraycopy(bytes, idxInBytes, dataByte.array(), startingIdxInBlock, + length); + this.cache.put(blockId, dataByte.array()); + + if (this.traceEnabled) { + TRACER.info("Task=WriteBlock,BlockID={},length={},SHA={}", + blockId, dataByte.array().length, + dataByte.array().length > 0 ? + DigestUtils.sha256Hex(dataByte.array()) : null); + } + dataByte.clear(); + startingIdxInBlock = 0; + idxInBytes += length; + } + + if (this.traceEnabled) { + TRACER.info("Task=WriteEnd,length={},location={} ", + bytes.length, storageIndex); + } + } + + @Override + public void close() throws IOException { + try { + cache.close(); + } catch (IllegalStateException ise) { + LOGGER.error("Can not close the storage {}", ise); + throw ise; + } + } + + /** + * Builder class for CBlocklocalCache. + */ + public static class Builder { + private String userName; + private String volumeName; + private long volumeSize; + private int blockSize; + private List containerList; + private Configuration conf; + private XceiverClientManager clientManager; + private ContainerCacheFlusher flusher; + private CBlockTargetMetrics metrics; + + /** + * Constructs a builder. + */ + Builder() { + + } + + public Builder setFlusher(ContainerCacheFlusher cacheFlusher) { + this.flusher = cacheFlusher; + return this; + } + + /** + * set config. + * + * @param config - config + * @return Builder + */ + public Builder setConf(Configuration config) { + this.conf = config; + return this; + } + + /** + * set user name. + * + * @param cblockUserName - user name + * @return Builder + */ + public Builder setUserName(String cblockUserName) { + this.userName = cblockUserName; + return this; + } + + /** + * set volume name. + * + * @param cblockVolumeName -- volume name + * @return Builder + */ + public Builder setVolumeName(String cblockVolumeName) { + this.volumeName = cblockVolumeName; + return this; + } + + /** + * set volume size. + * + * @param cblockVolumeSize -- set volume size. + * @return Builder + */ + public Builder setVolumeSize(long cblockVolumeSize) { + this.volumeSize = cblockVolumeSize; + return this; + } + + /** + * set block size. + * + * @param cblockBlockSize -- block size + * @return Builder + */ + public Builder setBlockSize(int cblockBlockSize) { + this.blockSize = cblockBlockSize; + return this; + } + + /** + * Set contianer list. + * + * @param cblockContainerList - set the pipeline list + * @return Builder + */ + public Builder setContainerList(List cblockContainerList) { + this.containerList = cblockContainerList; + return this; + } + + /** + * Set client manager. + * + * @param xceiverClientManager -- sets the client manager. + * @return Builder + */ + public Builder setClientManager(XceiverClientManager xceiverClientManager) { + this.clientManager = xceiverClientManager; + return this; + } + + /** + * Set Cblock Target Metrics. + * + * @param targetMetrics -- sets the cblock target metrics + * @return Builder + */ + public Builder setCBlockTargetMetrics(CBlockTargetMetrics targetMetrics) { + this.metrics = targetMetrics; + return this; + } + + /** + * Builds the CBlockStorageImpl. + * + * @return builds the CBlock Scsi Target. + */ + public CBlockIStorageImpl build() throws IOException { + if (StringUtils.isBlank(userName)) { + throw new IllegalArgumentException("User name cannot be null or empty" + + "."); + } + if (StringUtils.isBlank(volumeName)) { + throw new IllegalArgumentException("Volume name cannot be null or " + + "empty"); + } + + if (volumeSize < 1) { + throw new IllegalArgumentException("Volume size cannot be negative or" + + " zero."); + } + + if (blockSize < 1) { + throw new IllegalArgumentException("Block size cannot be negative or " + + "zero."); + } + + if (containerList == null || containerList.size() == 0) { + throw new IllegalArgumentException("Container list cannot be null or " + + "empty"); + } + if (clientManager == null) { + throw new IllegalArgumentException("Client manager cannot be null"); + } + if (conf == null) { + throw new IllegalArgumentException("Configuration cannot be null"); + } + + if (flusher == null) { + throw new IllegalArgumentException("Flusher Cannot be null."); + } + CBlockIStorageImpl impl = new CBlockIStorageImpl(this.conf, this.userName, + this.volumeName, this.volumeSize, this.blockSize, this.containerList, + this.flusher); + impl.initCache(this.clientManager, this.metrics); + return impl; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java new file mode 100644 index 0000000000..ea52c76e16 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/CBlockTargetMetrics.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.cblock.jscsiHelper; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRate; + +/** + * This class is for maintaining the various Cblock Target statistics + * and publishing them through the metrics interfaces. + * This also registers the JMX MBean for RPC. + * + * This class maintains stats like cache hit and miss ratio + * as well as the latency time of read and write ops. + */ +public class CBlockTargetMetrics { + @Metric private MutableCounterLong numReadOps; + @Metric private MutableCounterLong numWriteOps; + @Metric private MutableCounterLong numReadCacheHits; + @Metric private MutableCounterLong numReadCacheMiss; + @Metric private MutableCounterLong numReadLostBlocks; + + @Metric private MutableRate dbReadLatency; + @Metric private MutableRate containerReadLatency; + + @Metric private MutableRate dbWriteLatency; + @Metric private MutableRate containerWriteLatency; + + public CBlockTargetMetrics() { + } + + public static CBlockTargetMetrics create() { + MetricsSystem ms = DefaultMetricsSystem.instance(); + return ms.register("CBlockTargetMetrics", + "CBlock Target Metrics", + new CBlockTargetMetrics()); + } + + public void incNumReadOps() { + numReadOps.incr(); + } + + public void incNumWriteOps() { + numWriteOps.incr(); + } + + public void incNumReadCacheHits() { + numReadCacheHits.incr(); + } + + public void incNumReadCacheMiss() { + numReadCacheMiss.incr(); + } + + public void incNumReadLostBlocks() { + numReadLostBlocks.incr(); + } + + public void updateDBReadLatency(long latency) { + dbReadLatency.add(latency); + } + + public void updateContainerReadLatency(long latency) { + containerReadLatency.add(latency); + } + + public void updateDBWriteLatency(long latency) { + dbWriteLatency.add(latency); + } + + public void updateContainerWriteLatency(long latency) { + containerWriteLatency.add(latency); + } + + @VisibleForTesting + public long getNumReadOps() { + return numReadOps.value(); + } + + @VisibleForTesting + public long getNumWriteOps() { + return numWriteOps.value(); + } + + @VisibleForTesting + public long getNumReadCacheHits() { + return numReadCacheHits.value(); + } + + @VisibleForTesting + public long getNumReadCacheMiss() { + return numReadCacheMiss.value(); + } + + @VisibleForTesting + public long getNumReadLostBlocks() { + return numReadLostBlocks.value(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java new file mode 100644 index 0000000000..9aa6f17025 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/ContainerCacheFlusher.java @@ -0,0 +1,512 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; +import org.apache.hadoop.cblock.jscsiHelper.cache.impl.DiskBlock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.utils.LevelDBStore; +import org.iq80.leveldb.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_CORE_POOL_SIZE; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_CORE_POOL_SIZE_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_MAX_POOL_SIZE; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_QUEUE_SIZE_KB; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_THREAD_PRIORITY; +import static org.apache.hadoop.cblock.CBlockConfigKeys + .DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_KEY; + +/** + * Class that writes to remote containers. + */ +public class ContainerCacheFlusher implements Runnable { + private static final Logger LOG = + LoggerFactory.getLogger(ContainerCacheFlusher.class); + private final LinkedBlockingQueue messageQueue; + private final ThreadPoolExecutor threadPoolExecutor; + private final ArrayBlockingQueue workQueue; + private final ConcurrentMap dbMap; + private final ByteBuffer blockIDBuffer; + private final ConcurrentMap pipelineMap; + private final AtomicLong remoteIO; + private final XceiverClientManager xceiverClientManager; + private final CBlockTargetMetrics metrics; + private AtomicBoolean shutdown; + + private final ConcurrentMap finishCountMap; + + /** + * Constructs the writers to remote queue. + */ + public ContainerCacheFlusher(Configuration config, + XceiverClientManager xceiverClientManager, + CBlockTargetMetrics metrics) { + int queueSize = config.getInt(DFS_CBLOCK_CACHE_QUEUE_SIZE_KB, + DFS_CBLOCK_CACHE_QUEUE_SIZE_KB_DEFAULT) * 1024; + int corePoolSize = config.getInt(DFS_CBLOCK_CACHE_CORE_POOL_SIZE, + DFS_CBLOCK_CACHE_CORE_POOL_SIZE_DEFAULT); + int maxPoolSize = config.getInt(DFS_CBLOCK_CACHE_MAX_POOL_SIZE, + DFS_CBLOCK_CACHE_MAX_POOL_SIZE_DEFAULT); + long keepAlive = config.getLong(DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS, + DFS_CBLOCK_CACHE_KEEP_ALIVE_SECONDS_DEFAULT); + int threadPri = config.getInt(DFS_CBLOCK_CACHE_THREAD_PRIORITY, + DFS_CBLOCK_CACHE_THREAD_PRIORITY_DEFAULT); + int blockBufferSize = config.getInt(DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE, + DFS_CBLOCK_CACHE_BLOCK_BUFFER_SIZE_DEFAULT) * 1024; + + LOG.info("Cache: Core Pool Size: {}", corePoolSize); + LOG.info("Cache: Keep Alive: {}", keepAlive); + LOG.info("Cache: Max Pool Size: {}", maxPoolSize); + LOG.info("Cache: Thread Pri: {}", threadPri); + LOG.info("Cache: BlockBuffer Size: {}", blockBufferSize); + + shutdown = new AtomicBoolean(false); + messageQueue = new LinkedBlockingQueue<>(); + workQueue = new ArrayBlockingQueue<>(queueSize, true); + + ThreadFactory workerThreadFactory = new ThreadFactoryBuilder() + .setNameFormat("Cache Block Writer Thread #%d") + .setDaemon(true) + .setPriority(threadPri) + .build(); + threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, + keepAlive, TimeUnit.SECONDS, workQueue, workerThreadFactory, + new ThreadPoolExecutor.AbortPolicy()); + threadPoolExecutor.prestartAllCoreThreads(); + + dbMap = new ConcurrentHashMap<>(); + pipelineMap = new ConcurrentHashMap<>(); + blockIDBuffer = ByteBuffer.allocateDirect(blockBufferSize); + this.xceiverClientManager = xceiverClientManager; + this.metrics = metrics; + this.remoteIO = new AtomicLong(); + + this.finishCountMap = new ConcurrentHashMap<>(); + checkExisitingDirtyLog(config); + } + + private void checkExisitingDirtyLog(Configuration config) { + File dbPath = Paths.get(config.get(DFS_CBLOCK_DISK_CACHE_PATH_KEY, + DFS_CBLOCK_DISK_CACHE_PATH_DEFAULT)).toFile(); + if (!dbPath.exists()) { + LOG.info("No existing dirty log found at {}", dbPath); + return; + } + LOG.info("Need to check and requeue existing dirty log {}", dbPath); + HashMap> allFiles = new HashMap<>(); + traverse(dbPath, allFiles); + for (Map.Entry> entry : allFiles.entrySet()) { + String parentPath = entry.getKey(); + for (String fileName : entry.getValue()) { + LOG.info("found this {} with {}", parentPath, fileName); + processDirtyBlocks(parentPath, fileName); + } + } + } + + private void traverse(File path, HashMap> files) { + if (path.isFile()) { + if (path.getName().startsWith("DirtyLog")) { + LOG.debug("found this {} with {}", path.getParent(), path.getName()); + if (!files.containsKey(path.getParent())) { + files.put(path.getParent(), new ArrayList<>()); + } + files.get(path.getParent()).add(path.getName()); + } + } else { + File[] listFiles = path.listFiles(); + if (listFiles != null) { + for (File subPath : listFiles) { + traverse(subPath, files); + } + } + } + } + + /** + * Gets the CBlockTargetMetrics. + * + * @return CBlockTargetMetrics + */ + public CBlockTargetMetrics getTargetMetrics() { + return metrics; + } + + /** + * Gets the getXceiverClientManager. + * + * @return XceiverClientManager + */ + public XceiverClientManager getXceiverClientManager() { + return xceiverClientManager; + } + + /** + * Shutdown this instance. + */ + public void shutdown() { + this.shutdown.set(true); + threadPoolExecutor.shutdown(); + } + + public long incrementremoteIO() { + return remoteIO.incrementAndGet(); + } + + /** + * Processes a block cache file and queues those blocks for the remote I/O. + * + * @param dbPath - Location where the DB can be found. + * @param fileName - Block Cache File Name + */ + public void processDirtyBlocks(String dbPath, String fileName) { + LOG.info("Adding {}/{} to queue. Queue Length: {}", dbPath, fileName, + messageQueue.size()); + this.messageQueue.add(new Message(dbPath, fileName)); + } + + public Logger getLOG() { + return LOG; + } + + /** + * Opens a DB if needed or returns a handle to an already open DB. + * + * @param dbPath -- dbPath + * @param cacheSize - cacheSize + * @return the levelDB on the given path. + * @throws IOException + */ + public synchronized LevelDBStore openDB(String dbPath, int cacheSize) + throws IOException { + if (dbMap.containsKey(dbPath)) { + RefCountedDB refDB = dbMap.get(dbPath); + refDB.open(); + return refDB.db; + } else { + Options options = new Options(); + options.cacheSize(cacheSize * (1024L * 1024L)); + options.createIfMissing(true); + LevelDBStore cacheDB = new LevelDBStore( + new File(getDBFileName(dbPath)), options); + RefCountedDB refDB = new RefCountedDB(dbPath, cacheDB); + dbMap.put(dbPath, refDB); + return cacheDB; + } + } + + /** + * Updates the contianer map. This data never changes so we will update this + * during restarts and it should not hurt us. + * + * @param dbPath - DbPath + * @param containerList - Contianer List. + */ + public void register(String dbPath, Pipeline[] containerList) { + pipelineMap.put(dbPath, containerList); + } + + private String getDBFileName(String dbPath) { + return dbPath + ".db"; + } + + LevelDBStore getCacheDB(String dbPath) { + return dbMap.get(dbPath).db; + } + + /** + * Close the DB if we don't have any outstanding refrences. + * + * @param dbPath - dbPath + * @throws IOException + */ + public synchronized void closeDB(String dbPath) throws IOException { + if (dbMap.containsKey(dbPath)) { + RefCountedDB refDB = dbMap.get(dbPath); + int count = refDB.close(); + if (count == 0) { + dbMap.remove(dbPath); + } + } + } + + Pipeline getPipeline(String dbPath, long blockId) { + Pipeline[] containerList = pipelineMap.get(dbPath); + Preconditions.checkNotNull(containerList); + int containerIdx = (int) blockId % containerList.length; + long cBlockIndex = + Longs.fromByteArray(containerList[containerIdx].getData()); + if (cBlockIndex > 0) { + // This catches the case when we get a wrong container in the ordering + // of the containers. + Preconditions.checkState(containerIdx % cBlockIndex == 0, + "The container ID computed should match with the container index " + + "returned from cBlock Server."); + } + return containerList[containerIdx]; + } + + public void incFinishCount(String fileName) { + if (!finishCountMap.containsKey(fileName)) { + LOG.error("No record for such file:" + fileName); + return; + } + finishCountMap.get(fileName).incCount(); + if (finishCountMap.get(fileName).isFileDeleted()) { + finishCountMap.remove(fileName); + } + } + + /** + * When an object implementing interface Runnable is used + * to create a thread, starting the thread causes the object's + * run method to be called in that separately executing + * thread. + *

+ * The general contract of the method run is that it may + * take any action whatsoever. + * + * @see Thread#run() + */ + @Override + public void run() { + while (!this.shutdown.get()) { + try { + Message message = messageQueue.take(); + LOG.debug("Got message to process -- DB Path : {} , FileName; {}", + message.getDbPath(), message.getFileName()); + String fullPath = Paths.get(message.getDbPath(), + message.getFileName()).toString(); + ReadableByteChannel fileChannel = new FileInputStream(fullPath) + .getChannel(); + // TODO: We can batch and unique the IOs here. First getting the code + // to work, we will add those later. + int bytesRead = fileChannel.read(blockIDBuffer); + LOG.debug("Read blockID log of size: {} position {} remaining {}", + bytesRead, blockIDBuffer.position(), blockIDBuffer.remaining()); + // current position of in the buffer in bytes, divided by number of + // bytes per long (which is calculated by number of bits per long + // divided by number of bits per byte) gives the number of blocks + int blockCount = blockIDBuffer.position()/(Long.SIZE / Byte.SIZE); + if (finishCountMap.containsKey(message.getFileName())) { + // In theory this should never happen. But if it happened, + // we need to know it... + LOG.error("Adding DirtyLog file again {} current count {} new {}", + message.getFileName(), + finishCountMap.get(message.getFileName()).expectedCount, + blockCount); + } + finishCountMap.put(message.getFileName(), + new FinishCounter(blockCount, message.getDbPath(), + message.getFileName())); + // should be flip instead of rewind, because we also need to make sure + // the end position is correct. + blockIDBuffer.flip(); + LOG.debug("Remaining blocks count {} and {}", blockIDBuffer.remaining(), + blockCount); + while (blockIDBuffer.remaining() >= (Long.SIZE / Byte.SIZE)) { + long blockID = blockIDBuffer.getLong(); + LogicalBlock block = new DiskBlock(blockID, null, false); + BlockWriterTask blockWriterTask = new BlockWriterTask(block, this, + message.getDbPath(), message.getFileName()); + threadPoolExecutor.submit(blockWriterTask); + } + blockIDBuffer.clear(); + } catch (InterruptedException e) { + LOG.info("ContainerCacheFlusher is interrupted.", e); + } catch (FileNotFoundException e) { + LOG.error("Unable to find the dirty blocks file. This will cause " + + "data errors. Please stop using this volume.", e); + } catch (IOException e) { + LOG.error("Unable to read the dirty blocks file. This will cause " + + "data errors. Please stop using this volume.", e); + } catch (Exception e) { + LOG.error("Generic exception.", e); + } + } + LOG.info("Exiting flusher"); + } + + /** + * Keeps a Reference counted DB that we close only when the total Reference + * has gone to zero. + */ + private static class RefCountedDB { + private LevelDBStore db; + private AtomicInteger refcount; + private String dbPath; + + /** + * RefCountedDB DB ctor. + * + * @param dbPath - DB path. + * @param db - LevelDBStore db + */ + RefCountedDB(String dbPath, LevelDBStore db) { + this.db = db; + this.refcount = new AtomicInteger(1); + this.dbPath = dbPath; + } + + /** + * close the DB if possible. + */ + public int close() throws IOException { + int count = this.refcount.decrementAndGet(); + if (count == 0) { + LOG.info("Closing the LevelDB. {} ", this.dbPath); + db.close(); + } + return count; + } + + public void open() { + this.refcount.incrementAndGet(); + } + } + + /** + * The message held in processing queue. + */ + private static class Message { + private String dbPath; + private String fileName; + + /** + * A message that holds the info about which path dirty blocks log and + * which path contains db. + * + * @param dbPath + * @param fileName + */ + Message(String dbPath, String fileName) { + this.dbPath = dbPath; + this.fileName = fileName; + } + + public String getDbPath() { + return dbPath; + } + + public void setDbPath(String dbPath) { + this.dbPath = dbPath; + } + + public String getFileName() { + return fileName; + } + + public void setFileName(String fileName) { + this.fileName = fileName; + } + } + + private static class FinishCounter { + private final long expectedCount; + private final String dbPath; + private final String dirtyLogPath; + private final AtomicLong currentCount; + private AtomicBoolean fileDeleted; + + FinishCounter(long expectedCount, String dbPath, + String dirtyLogPath) { + this.expectedCount = expectedCount; + this.dbPath = dbPath; + this.dirtyLogPath = dirtyLogPath; + this.currentCount = new AtomicLong(0); + this.fileDeleted = new AtomicBoolean(false); + } + + public boolean isFileDeleted() { + return fileDeleted.get(); + } + + public void incCount() { + long count = this.currentCount.incrementAndGet(); + if (count >= expectedCount) { + String filePath = String.format("%s/%s", dbPath, dirtyLogPath); + LOG.debug( + "Deleting {} with count {} {}", filePath, count, expectedCount); + try { + Path path = Paths.get(filePath); + Files.delete(path); + // the following part tries to remove the directory if it is empty + // but not sufficient, because the .db directory still exists.... + // TODO how to handle the .db directory? + /*Path parent = path.getParent(); + if (parent.toFile().listFiles().length == 0) { + Files.delete(parent); + }*/ + fileDeleted.set(true); + } catch (IOException e) { + LOG.error( + "Error deleting dirty log file {} {}", filePath, e.toString()); + } + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java new file mode 100644 index 0000000000..300b2aeff8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/CacheModule.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper.cache; + +import java.io.IOException; + +/** + * Defines the interface for cache implementations. The cache will be called + * by cblock storage module when it performs IO operations. + */ +public interface CacheModule { + /** + * check if the key is cached, if yes, returned the cached object. + * otherwise, load from data source. Then put it into cache. + * + * @param blockID + * @return the target block. + */ + LogicalBlock get(long blockID) throws IOException; + + /** + * put the value of the key into cache. + * @param blockID + * @param value + */ + void put(long blockID, byte[] value) throws IOException; + + void flush() throws IOException; + + void start() throws IOException; + + void stop() throws IOException; + + void close() throws IOException; + + boolean isDirtyCache(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java new file mode 100644 index 0000000000..470826f0ea --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/LogicalBlock.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper.cache; + +import java.nio.ByteBuffer; + +/** + * Logical Block is the data structure that we write to the cache, + * the key and data gets written to remote contianers. Rest is used for + * book keeping for the cache. + */ +public interface LogicalBlock { + /** + * Returns the data stream of this block. + * @return - ByteBuffer + */ + ByteBuffer getData(); + + /** + * Frees the byte buffer since we don't need it any more. + */ + void clearData(); + + /** + * Returns the Block ID for this Block. + * @return long - BlockID + */ + long getBlockID(); + + /** + * Flag that tells us if this block has been persisted to container. + * @return whether this block is now persistent + */ + boolean isPersisted(); +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java new file mode 100644 index 0000000000..743a35b114 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/CBlockLocalCache.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper.cache.impl; + +import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics; +import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher; +import org.apache.hadoop.cblock.jscsiHelper.cache.CacheModule; +import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; + +import java.io.IOException; +import java.util.List; + + +/** + * A local cache used by the CBlock ISCSI server. This class is enabled or + * disabled via config settings. + * + * TODO : currently, this class is a just a place holder. + */ +final public class CBlockLocalCache implements CacheModule { + + private CBlockLocalCache() { + } + + @Override + public LogicalBlock get(long blockID) throws IOException { + return null; + } + + @Override + public void put(long blockID, byte[] data) throws IOException { + } + + @Override + public void flush() throws IOException { + + } + + @Override + public void start() throws IOException { + + } + + @Override + public void stop() throws IOException { + + } + + @Override + public void close() throws IOException { + + } + + @Override + public boolean isDirtyCache() { + return false; + } + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * Builder class for CBlocklocalCache. + */ + public static class Builder { + + /** + * Sets the Config to be used by this cache. + * + * @param configuration - Config + * @return Builder + */ + public Builder setConfiguration(Configuration configuration) { + return this; + } + + /** + * Sets the user name who is the owner of this volume. + * + * @param userName - name of the owner, please note this is not the current + * user name. + * @return - Builder + */ + public Builder setUserName(String userName) { + return this; + } + + /** + * Sets the VolumeName. + * + * @param volumeName - Name of the volume + * @return Builder + */ + public Builder setVolumeName(String volumeName) { + return this; + } + + /** + * Sets the Pipelines that form this volume. + * + * @param pipelines - list of pipelines + * @return Builder + */ + public Builder setPipelines(List pipelines) { + return this; + } + + /** + * Sets the Client Manager that manages the communication with containers. + * + * @param clientManager - clientManager. + * @return - Builder + */ + public Builder setClientManager(XceiverClientManager clientManager) { + return this; + } + + /** + * Sets the block size -- Typical sizes are 4KB, 8KB etc. + * + * @param blockSize - BlockSize. + * @return - Builder + */ + public Builder setBlockSize(int blockSize) { + return this; + } + + /** + * Sets the volumeSize. + * + * @param volumeSize - VolumeSize + * @return - Builder + */ + public Builder setVolumeSize(long volumeSize) { + return this; + } + + /** + * Set flusher. + * @param flusher - cache Flusher + * @return Builder. + */ + public Builder setFlusher(ContainerCacheFlusher flusher) { + return this; + } + + /** + * Sets the cblock Metrics. + * + * @param targetMetrics - CBlock Target Metrics + * @return - Builder + */ + public Builder setCBlockTargetMetrics(CBlockTargetMetrics targetMetrics) { + return this; + } + + public CBlockLocalCache build() throws IOException { + return new CBlockLocalCache(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/DiskBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/DiskBlock.java new file mode 100644 index 0000000000..26c174f828 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/DiskBlock.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper.cache.impl; + +import org.apache.hadoop.cblock.jscsiHelper.cache.LogicalBlock; +import java.nio.ByteBuffer; + +/** + * Impl class for LogicalBlock. + */ +public class DiskBlock implements LogicalBlock { + private ByteBuffer data; + private long blockID; + private boolean persisted; + + /** + * Constructs a DiskBlock Class from the following params. + * @param blockID - 64-bit block ID + * @param data - Byte Array + * @param persisted - Flag which tells us if this is persisted to remote + */ + public DiskBlock(long blockID, byte[] data, boolean persisted) { + if (data !=null) { + this.data = ByteBuffer.wrap(data); + } + this.blockID = blockID; + this.persisted = persisted; + } + + @Override + public ByteBuffer getData() { + return data; + } + + /** + * Frees the byte buffer since we don't need it any more. + */ + @Override + public void clearData() { + data.clear(); + } + + @Override + public long getBlockID() { + return blockID; + } + + @Override + public boolean isPersisted() { + return persisted; + } + + /** + * Sets the value of persisted. + * @param value - True if this has been persisted to container, false + * otherwise. + */ + public void setPersisted(boolean value) { + persisted = value; + } + +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/package-info.java new file mode 100644 index 0000000000..dfac1108eb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper.cache.impl; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/package-info.java new file mode 100644 index 0000000000..47f76b819e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper.cache; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/package-info.java new file mode 100644 index 0000000000..85f8d6fecf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock.jscsiHelper; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestStorageImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestStorageImpl.java new file mode 100644 index 0000000000..b440ae0078 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/TestStorageImpl.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.cblock; + +import com.google.common.primitives.Longs; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.cblock.jscsiHelper.CBlockIStorageImpl; +import org.apache.hadoop.cblock.jscsiHelper.CBlockTargetMetrics; +import org.apache.hadoop.cblock.jscsiHelper.ContainerCacheFlusher; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfiguration; +import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer; +import org.apache.hadoop.scm.XceiverClientManager; +import org.apache.hadoop.scm.XceiverClientSpi; +import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; +import org.apache.hadoop.scm.storage.ContainerProtocolCalls; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.LinkedList; +import java.util.List; + +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_DISK_CACHE_PATH_KEY; +import static org.apache.hadoop.cblock.CBlockConfigKeys.DFS_CBLOCK_TRACE_IO; + +/** + * This class tests the cblock storage layer. + */ +public class TestStorageImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(TestStorageImpl.class); + private final static long GB = 1024 * 1024 * 1024; + private final static int KB = 1024; + private static MiniOzoneCluster cluster; + private static OzoneConfiguration config; + private static StorageContainerLocationProtocolClientSideTranslatorPB + storageContainerLocationClient; + private static XceiverClientManager xceiverClientManager; + + @BeforeClass + public static void init() throws IOException { + config = new OzoneConfiguration(); + URL p = config.getClass().getResource(""); + String path = p.getPath().concat( + TestOzoneContainer.class.getSimpleName()); + config.set(DFS_CBLOCK_DISK_CACHE_PATH_KEY, path); + config.setBoolean(DFS_CBLOCK_TRACE_IO, true); + cluster = new MiniOzoneCluster.Builder(config) + .numDataNodes(1).setHandlerType("distributed").build(); + storageContainerLocationClient = cluster + .createStorageContainerLocationClient(); + xceiverClientManager = new XceiverClientManager(config); + } + + @AfterClass + public static void shutdown() throws InterruptedException { + if (cluster != null) { + cluster.shutdown(); + } + IOUtils.cleanup(null, storageContainerLocationClient, cluster); + } + + /** + * getContainerPipelines creates a set of containers and returns the + * Pipelines that define those containers. + * + * @param count - Number of containers to create. + * @return - List of Pipelines. + * @throws IOException + */ + private List getContainerPipeline(int count) throws IOException { + List containerPipelines = new LinkedList<>(); + for (int x = 0; x < count; x++) { + String traceID = "trace" + RandomStringUtils.randomNumeric(4); + String containerName = "container" + RandomStringUtils.randomNumeric(10); + Pipeline pipeline = + storageContainerLocationClient.allocateContainer(containerName); + XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline); + ContainerProtocolCalls.createContainer(client, traceID); + // This step is needed since we set private data on pipelines, when we + // read the list from CBlockServer. So we mimic that action here. + pipeline.setData(Longs.toByteArray(x)); + containerPipelines.add(pipeline); + } + return containerPipelines; + } + + @Test + public void testStorageImplBasicReadWrite() throws Exception { + OzoneConfiguration oConfig = new OzoneConfiguration(); + String userName = "user" + RandomStringUtils.randomNumeric(5); + String volumeName = "volume" + RandomStringUtils.randomNumeric(5); + long volumeSize = 50L * (1024L * 1024L * 1024L); + int blockSize = 4096; + byte[] data = + RandomStringUtils.randomAlphanumeric(10 * (1024 * 1024)) + .getBytes(StandardCharsets.UTF_8); + String hash = DigestUtils.sha256Hex(data); + CBlockTargetMetrics metrics = CBlockTargetMetrics.create(); + ContainerCacheFlusher flusher = new ContainerCacheFlusher(oConfig, + xceiverClientManager, metrics); + CBlockIStorageImpl ozoneStore = CBlockIStorageImpl.newBuilder() + .setUserName(userName) + .setVolumeName(volumeName) + .setVolumeSize(volumeSize) + .setBlockSize(blockSize) + .setContainerList(getContainerPipeline(10)) + .setClientManager(xceiverClientManager) + .setConf(oConfig) + .setFlusher(flusher) + .setCBlockTargetMetrics(metrics) + .build(); + ozoneStore.write(data, 0); + + // Currently, local cache is a placeholder and does not actually handle + // read and write. So the below write is guaranteed to fail. After + // CBlockLocalCache is properly implemented, we should uncomment the + // following lines + // TODO uncomment the following. + + //byte[] newData = new byte[10 * 1024 * 1024]; + //ozoneStore.read(newData, 0); + //String newHash = DigestUtils.sha256Hex(newData); + //Assert.assertEquals("hashes don't match.", hash, newHash); + GenericTestUtils.waitFor(() -> !ozoneStore.getCache().isDirtyCache(), + 100, 20 * + 1000); + + ozoneStore.close(); + } +}