diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java index d77091f863..87f40b324c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/cblock/CBlockConfigKeys.java @@ -172,6 +172,21 @@ public final class CBlockConfigKeys { public static final int DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT = 64 * 1024; + /** + * Cblock CLI configs. + */ + public static final String DFS_CBLOCK_MANAGER_POOL_SIZE = + "dfs.cblock.manager.pool.size"; + public static final int DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT = 16; + + /** + * currently the largest supported volume is about 8TB, which might take + * > 20 seconds to finish creating containers. thus set timeout to 30 sec. + */ + public static final String DFS_CBLOCK_RPC_TIMEOUT_SECONDS = + "dfs.cblock.rpc.timeout.seconds"; + public static final int DFS_CBLOCK_RPC_TIMEOUT_SECONDS_DEFAULT = 300; + private CBlockConfigKeys() { } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java index 90a16cef32..11965a38b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/client/CBlockVolumeClient.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.cblock.client; +import org.apache.hadoop.cblock.CBlockConfigKeys; import org.apache.hadoop.cblock.meta.VolumeInfo; import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; import org.apache.hadoop.io.retry.RetryPolicies; @@ -36,32 +37,25 @@ */ public class CBlockVolumeClient { private final CBlockServiceProtocolClientSideTranslatorPB cblockClient; - private final OzoneConfiguration conf; public CBlockVolumeClient(OzoneConfiguration conf) throws IOException { - this.conf = conf; - long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class); - InetSocketAddress address = OzoneClientUtils.getCblockServiceRpcAddr(conf); - // currently the largest supported volume is about 8TB, which might take - // > 20 seconds to finish creating containers. thus set timeout to 30 sec. - cblockClient = new CBlockServiceProtocolClientSideTranslatorPB( - RPC.getProtocolProxy(CBlockServiceProtocolPB.class, version, - address, UserGroupInformation.getCurrentUser(), conf, - NetUtils.getDefaultSocketFactory(conf), 30000, RetryPolicies - .retryUpToMaximumCountWithFixedSleep(300, 1, TimeUnit - .SECONDS)).getProxy()); + this(conf, null); } public CBlockVolumeClient(OzoneConfiguration conf, InetSocketAddress serverAddress) throws IOException { - this.conf = conf; + InetSocketAddress address = serverAddress != null ? serverAddress : + OzoneClientUtils.getCblockServiceRpcAddr(conf); long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class); + int rpcTimeout = + conf.getInt(CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT_SECONDS, + CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT_SECONDS_DEFAULT) * 1000; cblockClient = new CBlockServiceProtocolClientSideTranslatorPB( RPC.getProtocolProxy(CBlockServiceProtocolPB.class, version, - serverAddress, UserGroupInformation.getCurrentUser(), conf, - NetUtils.getDefaultSocketFactory(conf), 30000, RetryPolicies - .retryUpToMaximumCountWithFixedSleep(300, 1, TimeUnit - .SECONDS)).getProxy()); + address, UserGroupInformation.getCurrentUser(), conf, + NetUtils.getDefaultSocketFactory(conf), rpcTimeout, RetryPolicies + .retryUpToMaximumCountWithFixedSleep( + 300, 1, TimeUnit.SECONDS)).getProxy()); } public void createVolume(String userName, String volumeName, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java index c446dacae5..04fe3a4544 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/BlockWriterTask.java @@ -77,6 +77,7 @@ public void run() { String containerName = null; XceiverClientSpi client = null; LevelDBStore levelDBStore = null; + String traceID = flusher.getTraceID(new File(dbPath), block.getBlockID()); flusher.getLOG().debug( "Writing block to remote. block ID: {}", block.getBlockID()); try { @@ -94,8 +95,7 @@ public void run() { Preconditions.checkState(data.length > 0, "Block data is zero length"); startTime = Time.monotonicNow(); ContainerProtocolCalls.writeSmallFile(client, containerName, - Long.toString(block.getBlockID()), data, - flusher.getTraceID(new File(dbPath), block.getBlockID())); + Long.toString(block.getBlockID()), data, traceID); endTime = Time.monotonicNow(); flusher.getTargetMetrics().updateContainerWriteLatency( endTime - startTime); @@ -107,7 +107,7 @@ public void run() { } catch (Exception ex) { flusher.getLOG().error("Writing of block:{} failed, We have attempted " + "to write this block {} times to the container {}.Trace ID:{}", - block.getBlockID(), this.getTryCount(), containerName, "", ex); + block.getBlockID(), this.getTryCount(), containerName, traceID, ex); writeRetryBlock(block); if (ex instanceof IOException) { flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java index 6ee67c0589..992578fa83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/jscsiHelper/cache/impl/AsyncBlockWriter.java @@ -151,6 +151,7 @@ public long getLocalIOCount() { */ public void writeBlock(LogicalBlock block) throws IOException { byte[] keybuf = Longs.toByteArray(block.getBlockID()); + String traceID = parentCache.getTraceID(block.getBlockID()); if (parentCache.isShortCircuitIOEnabled()) { long startTime = Time.monotonicNow(); getCacheDB().put(keybuf, block.getData().array()); @@ -176,7 +177,7 @@ public void writeBlock(LogicalBlock block) throws IOException { .acquireClient(parentCache.getPipeline(block.getBlockID())); ContainerProtocolCalls.writeSmallFile(client, containerName, Long.toString(block.getBlockID()), block.getData().array(), - parentCache.getTraceID(block.getBlockID())); + traceID); long endTime = Time.monotonicNow(); if (parentCache.isTraceEnabled()) { String datahash = DigestUtils.sha256Hex(block.getData().array()); @@ -189,8 +190,9 @@ public void writeBlock(LogicalBlock block) throws IOException { parentCache.getTargetMetrics().incNumDirectBlockWrites(); } catch (Exception ex) { parentCache.getTargetMetrics().incNumFailedDirectBlockWrites(); - LOG.error("Direct I/O writing of block:{} to container {} failed", - block.getBlockID(), containerName, ex); + LOG.error("Direct I/O writing of block:{} traceID:{} to " + + "container {} failed", block.getBlockID(), traceID, + containerName, ex); throw ex; } finally { if (client != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java index c0031bae27..4f5930d98c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/meta/VolumeDescriptor.java @@ -29,6 +29,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * The internal representation maintained by CBlock server as the info for @@ -53,7 +54,7 @@ public class VolumeDescriptor { private static final Logger LOG = LoggerFactory.getLogger(VolumeDescriptor.class); - private HashMap containerMap; + private ConcurrentHashMap containerMap; private String userName; private int blockSize; private long volumeSize; @@ -72,13 +73,12 @@ public class VolumeDescriptor { * and set*() methods are for the same purpose also. */ public VolumeDescriptor() { - containerMap = new HashMap<>(); - containerIdOrdered = new ArrayList<>(); + this(null, null, 0, 0); } public VolumeDescriptor(String userName, String volumeName, long volumeSize, int blockSize) { - this.containerMap = new HashMap<>(); + this.containerMap = new ConcurrentHashMap<>(); this.userName = userName; this.volumeName = volumeName; this.blockSize = blockSize; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java index 1f22aa8646..711e76315d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.cblock.storage; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.cblock.CBlockConfigKeys; import org.apache.hadoop.cblock.exception.CBlockException; import org.apache.hadoop.cblock.meta.ContainerDescriptor; import org.apache.hadoop.cblock.meta.VolumeDescriptor; @@ -37,25 +38,27 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; /** * This class maintains the key space of CBlock, more specifically, the * volume to container mapping. The core data structure * is a map from users to their volumes info, where volume info is a handler - * to a volume, containing information for IO on that volume. - * - * and a storage client responsible for talking to the SCM - * - * TODO : all the volume operations are fully serialized, which can potentially - * be optimized. - * - * TODO : if the certain operations (e.g. create) failed, the failure-handling - * logic may not be properly implemented currently. + * to a volume, containing information for IO on that volume and a storage + * client responsible for talking to the SCM. */ public class StorageManager { private static final Logger LOGGER = LoggerFactory.getLogger(StorageManager.class); private final ScmClient storageClient; + private final int numThreads; + private static final int MAX_THREADS = + Runtime.getRuntime().availableProcessors() * 2; + private static final int MAX_QUEUE_CAPACITY = 1024; + /** * We will NOT have the situation where same kv pair getting * processed, but it is possible to have multiple kv pair being @@ -78,6 +81,9 @@ public StorageManager(ScmClient storageClient, this.storageClient = storageClient; this.user2VolumeMap = new ConcurrentHashMap<>(); this.containerSizeB = storageClient.getContainerSize(null); + this.numThreads = + ozoneConfig.getInt(CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE, + CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT); } /** @@ -149,6 +155,127 @@ public synchronized void addVolume(VolumeDescriptor volumeDescriptor) makeVolumeReady(userName, volumeName, volumeDescriptor); } + private class CreateContainerTask implements Runnable { + private final VolumeDescriptor volume; + private final int containerIdx; + private final ArrayList containerIds; + private final AtomicInteger numFailed; + + CreateContainerTask(VolumeDescriptor volume, int containerIdx, + ArrayList containerIds, + AtomicInteger numFailed) { + this.volume = volume; + this.containerIdx = containerIdx; + this.containerIds = containerIds; + this.numFailed = numFailed; + } + + /** + * When an object implementing interface Runnable is used + * to create a thread, starting the thread causes the object's + * run method to be called in that separately executing + * thread. + *

+ * The general contract of the method run is that it may + * take any action whatsoever. + * + * @see Thread#run() + */ + public void run() { + ContainerDescriptor container = null; + try { + Pipeline pipeline = storageClient.createContainer( + OzoneProtos.ReplicationType.STAND_ALONE, + OzoneProtos.ReplicationFactor.ONE, + KeyUtil.getContainerName(volume.getUserName(), + volume.getVolumeName(), containerIdx)); + + container = new ContainerDescriptor(pipeline.getContainerName()); + + container.setPipeline(pipeline); + container.setContainerIndex(containerIdx); + volume.addContainer(container); + containerIds.set(containerIdx, container.getContainerID()); + } catch (Exception e) { + numFailed.incrementAndGet(); + if (container != null) { + LOGGER.error("Error creating container Container:{}:" + + " index:{} error:{}", container.getContainerID(), + containerIdx, e); + } + } + } + } + + private boolean createVolumeContainers(VolumeDescriptor volume) { + ArrayList containerIds = new ArrayList<>(); + ThreadPoolExecutor executor = new ThreadPoolExecutor(numThreads, + MAX_THREADS, 1, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY), + new ThreadPoolExecutor.CallerRunsPolicy()); + + AtomicInteger numFailedCreates = new AtomicInteger(0); + long allocatedSize = 0; + int containerIdx = 0; + while (allocatedSize < volume.getVolumeSize()) { + // adding null to allocate space in ArrayList + containerIds.add(containerIdx, null); + Runnable task = new CreateContainerTask(volume, containerIdx, + containerIds, numFailedCreates); + executor.submit(task); + allocatedSize += containerSizeB; + containerIdx += 1; + } + + // issue the command and then wait for it to finish + executor.shutdown(); + try { + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES); + } catch (InterruptedException e) { + LOGGER.error("Error creating volume:{} error:{}", + volume.getVolumeName(), e); + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + + volume.setContainerIDs(containerIds); + return numFailedCreates.get() == 0; + } + + private void deleteContainer(String containerID, boolean force) { + try { + Pipeline pipeline = storageClient.getContainer(containerID); + storageClient.deleteContainer(pipeline, force); + } catch (Exception e) { + LOGGER.error("Error deleting container Container:{} error:{}", + containerID, e); + } + } + + private void deleteVolumeContainers(List containers, boolean force) + throws CBlockException { + ThreadPoolExecutor executor = new ThreadPoolExecutor(numThreads, + MAX_THREADS, 1, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY), + new ThreadPoolExecutor.CallerRunsPolicy()); + + for (String deleteContainer : containers) { + if (deleteContainer != null) { + Runnable task = () -> deleteContainer(deleteContainer, force); + executor.submit(task); + } + } + + // issue the command and then wait for it to finish + executor.shutdown(); + try { + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES); + } catch (InterruptedException e) { + LOGGER.error("Error deleting containers error:{}", e); + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } /** * Called by CBlock server when creating a fresh volume. The core @@ -172,31 +299,13 @@ public synchronized void createVolume(String userName, String volumeName, throw new CBlockException("Volume size smaller than block size? " + "volume size:" + volumeSize + " block size:" + blockSize); } - VolumeDescriptor volume; - int containerIdx = 0; - try { - volume = new VolumeDescriptor(userName, volumeName, - volumeSize, blockSize); - long allocatedSize = 0; - ArrayList containerIds = new ArrayList<>(); - while (allocatedSize < volumeSize) { - Pipeline pipeline = storageClient.createContainer(OzoneProtos - .ReplicationType.STAND_ALONE, - OzoneProtos.ReplicationFactor.ONE, - KeyUtil.getContainerName(userName, volumeName, containerIdx)); - ContainerDescriptor container = - new ContainerDescriptor(pipeline.getContainerName()); - container.setPipeline(pipeline); - container.setContainerIndex(containerIdx); - volume.addContainer(container); - containerIds.add(container.getContainerID()); - allocatedSize += containerSizeB; - containerIdx += 1; - } - volume.setContainerIDs(containerIds); - } catch (IOException e) { - throw new CBlockException("Error when creating volume:" + e.getMessage()); - // TODO : delete already created containers? or re-try policy + VolumeDescriptor volume + = new VolumeDescriptor(userName, volumeName, volumeSize, blockSize); + boolean success = createVolumeContainers(volume); + if (!success) { + // cleanup the containers and throw the exception + deleteVolumeContainers(volume.getContainerIDsList(), true); + throw new CBlockException("Error when creating volume:" + volumeName); } makeVolumeReady(userName, volumeName, volume); } @@ -223,16 +332,7 @@ public synchronized void deleteVolume(String userName, String volumeName, throw new CBlockException("Deleting a non-empty volume without force!"); } VolumeDescriptor volume = user2VolumeMap.get(userName).remove(volumeName); - for (String containerID : volume.getContainerIDsList()) { - try { - Pipeline pipeline = storageClient.getContainer(containerID); - storageClient.deleteContainer(pipeline, force); - } catch (IOException e) { - LOGGER.error("Error deleting container Container:{} error:{}", - containerID, e); - throw new CBlockException(e.getMessage()); - } - } + deleteVolumeContainers(volume.getContainerIDsList(), force); if (user2VolumeMap.get(userName).size() == 0) { user2VolumeMap.remove(userName); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index 4e0028f34f..0860db8013 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -897,6 +897,22 @@ + + dfs.cblock.manager.pool.size + 16 + + Number of threads that cblock manager will use for container operations. + + + + + dfs.cblock.rpc.timeout.seconds + 300 + + RPC timeout in seconds used for cblock CLI operations. + + + dfs.cblock.scm.ipaddress 127.0.0.1 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java index 8d9c2a0bdb..8cb57d6d1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/ContainerLookUpService.java @@ -22,7 +22,7 @@ import org.apache.hadoop.scm.container.common.helpers.Pipeline; import java.io.IOException; -import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; /** * NOTE : This class is only for testing purpose. @@ -34,8 +34,8 @@ * This is to allow volume creation call and perform standalone tests. */ public final class ContainerLookUpService { - private static HashMap - containers = new HashMap<>(); + private static ConcurrentHashMap + containers = new ConcurrentHashMap<>(); /** * Return an *existing* container with given Id. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java index f45ea15f1f..a318876cbc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/cblock/util/MockStorageClient.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; /** * This class is the one that directly talks to SCM server. @@ -37,7 +38,8 @@ * */ public class MockStorageClient implements ScmClient { - private static long currentContainerId = -1; + private static AtomicInteger currentContainerId = + new AtomicInteger(0); /** * Ask SCM to get a exclusive container. @@ -48,9 +50,9 @@ public class MockStorageClient implements ScmClient { @Override public Pipeline createContainer(String containerId) throws IOException { - currentContainerId += 1; - ContainerLookUpService.addContainer(Long.toString(currentContainerId)); - return ContainerLookUpService.lookUp(Long.toString(currentContainerId)) + int contId = currentContainerId.getAndIncrement(); + ContainerLookUpService.addContainer(Long.toString(contId)); + return ContainerLookUpService.lookUp(Long.toString(contId)) .getPipeline(); } @@ -126,10 +128,11 @@ public long getContainerSize(Pipeline pipeline) throws IOException { public Pipeline createContainer(OzoneProtos.ReplicationType type, OzoneProtos.ReplicationFactor replicationFactor, String containerId) throws IOException { - currentContainerId += 1; - ContainerLookUpService.addContainer(Long.toString(currentContainerId)); - return ContainerLookUpService.lookUp(Long.toString(currentContainerId)) - .getPipeline(); } + int contId = currentContainerId.getAndIncrement(); + ContainerLookUpService.addContainer(Long.toString(contId)); + return ContainerLookUpService.lookUp(Long.toString(contId)) + .getPipeline(); + } /** * Returns a set of Nodes that meet a query criteria.