HDFS-12210. Block Storage: volume creation times out while creating 3TB volume because of too many containers. Contributed by Mukul Kumar Singh.

This commit is contained in:
Chen Liang 2017-09-26 08:28:53 -07:00 committed by Owen O'Malley
parent dddded0016
commit 9ff136bb02
9 changed files with 212 additions and 82 deletions

View File

@ -172,6 +172,21 @@ public final class CBlockConfigKeys {
public static final int DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT = public static final int DFS_CBLOCK_CACHE_MAX_RETRY_DEFAULT =
64 * 1024; 64 * 1024;
/**
* Cblock CLI configs.
*/
public static final String DFS_CBLOCK_MANAGER_POOL_SIZE =
"dfs.cblock.manager.pool.size";
public static final int DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT = 16;
/**
* currently the largest supported volume is about 8TB, which might take
* > 20 seconds to finish creating containers. thus set timeout to 30 sec.
*/
public static final String DFS_CBLOCK_RPC_TIMEOUT_SECONDS =
"dfs.cblock.rpc.timeout.seconds";
public static final int DFS_CBLOCK_RPC_TIMEOUT_SECONDS_DEFAULT = 300;
private CBlockConfigKeys() { private CBlockConfigKeys() {
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.cblock.client; package org.apache.hadoop.cblock.client;
import org.apache.hadoop.cblock.CBlockConfigKeys;
import org.apache.hadoop.cblock.meta.VolumeInfo; import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB; import org.apache.hadoop.cblock.protocolPB.CBlockServiceProtocolPB;
import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicies;
@ -36,32 +37,25 @@
*/ */
public class CBlockVolumeClient { public class CBlockVolumeClient {
private final CBlockServiceProtocolClientSideTranslatorPB cblockClient; private final CBlockServiceProtocolClientSideTranslatorPB cblockClient;
private final OzoneConfiguration conf;
public CBlockVolumeClient(OzoneConfiguration conf) throws IOException { public CBlockVolumeClient(OzoneConfiguration conf) throws IOException {
this.conf = conf; this(conf, null);
long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class);
InetSocketAddress address = OzoneClientUtils.getCblockServiceRpcAddr(conf);
// currently the largest supported volume is about 8TB, which might take
// > 20 seconds to finish creating containers. thus set timeout to 30 sec.
cblockClient = new CBlockServiceProtocolClientSideTranslatorPB(
RPC.getProtocolProxy(CBlockServiceProtocolPB.class, version,
address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), 30000, RetryPolicies
.retryUpToMaximumCountWithFixedSleep(300, 1, TimeUnit
.SECONDS)).getProxy());
} }
public CBlockVolumeClient(OzoneConfiguration conf, public CBlockVolumeClient(OzoneConfiguration conf,
InetSocketAddress serverAddress) throws IOException { InetSocketAddress serverAddress) throws IOException {
this.conf = conf; InetSocketAddress address = serverAddress != null ? serverAddress :
OzoneClientUtils.getCblockServiceRpcAddr(conf);
long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class); long version = RPC.getProtocolVersion(CBlockServiceProtocolPB.class);
int rpcTimeout =
conf.getInt(CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT_SECONDS,
CBlockConfigKeys.DFS_CBLOCK_RPC_TIMEOUT_SECONDS_DEFAULT) * 1000;
cblockClient = new CBlockServiceProtocolClientSideTranslatorPB( cblockClient = new CBlockServiceProtocolClientSideTranslatorPB(
RPC.getProtocolProxy(CBlockServiceProtocolPB.class, version, RPC.getProtocolProxy(CBlockServiceProtocolPB.class, version,
serverAddress, UserGroupInformation.getCurrentUser(), conf, address, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), 30000, RetryPolicies NetUtils.getDefaultSocketFactory(conf), rpcTimeout, RetryPolicies
.retryUpToMaximumCountWithFixedSleep(300, 1, TimeUnit .retryUpToMaximumCountWithFixedSleep(
.SECONDS)).getProxy()); 300, 1, TimeUnit.SECONDS)).getProxy());
} }
public void createVolume(String userName, String volumeName, public void createVolume(String userName, String volumeName,

View File

@ -77,6 +77,7 @@ public void run() {
String containerName = null; String containerName = null;
XceiverClientSpi client = null; XceiverClientSpi client = null;
LevelDBStore levelDBStore = null; LevelDBStore levelDBStore = null;
String traceID = flusher.getTraceID(new File(dbPath), block.getBlockID());
flusher.getLOG().debug( flusher.getLOG().debug(
"Writing block to remote. block ID: {}", block.getBlockID()); "Writing block to remote. block ID: {}", block.getBlockID());
try { try {
@ -94,8 +95,7 @@ public void run() {
Preconditions.checkState(data.length > 0, "Block data is zero length"); Preconditions.checkState(data.length > 0, "Block data is zero length");
startTime = Time.monotonicNow(); startTime = Time.monotonicNow();
ContainerProtocolCalls.writeSmallFile(client, containerName, ContainerProtocolCalls.writeSmallFile(client, containerName,
Long.toString(block.getBlockID()), data, Long.toString(block.getBlockID()), data, traceID);
flusher.getTraceID(new File(dbPath), block.getBlockID()));
endTime = Time.monotonicNow(); endTime = Time.monotonicNow();
flusher.getTargetMetrics().updateContainerWriteLatency( flusher.getTargetMetrics().updateContainerWriteLatency(
endTime - startTime); endTime - startTime);
@ -107,7 +107,7 @@ public void run() {
} catch (Exception ex) { } catch (Exception ex) {
flusher.getLOG().error("Writing of block:{} failed, We have attempted " + flusher.getLOG().error("Writing of block:{} failed, We have attempted " +
"to write this block {} times to the container {}.Trace ID:{}", "to write this block {} times to the container {}.Trace ID:{}",
block.getBlockID(), this.getTryCount(), containerName, "", ex); block.getBlockID(), this.getTryCount(), containerName, traceID, ex);
writeRetryBlock(block); writeRetryBlock(block);
if (ex instanceof IOException) { if (ex instanceof IOException) {
flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks(); flusher.getTargetMetrics().incNumWriteIOExceptionRetryBlocks();

View File

@ -151,6 +151,7 @@ public long getLocalIOCount() {
*/ */
public void writeBlock(LogicalBlock block) throws IOException { public void writeBlock(LogicalBlock block) throws IOException {
byte[] keybuf = Longs.toByteArray(block.getBlockID()); byte[] keybuf = Longs.toByteArray(block.getBlockID());
String traceID = parentCache.getTraceID(block.getBlockID());
if (parentCache.isShortCircuitIOEnabled()) { if (parentCache.isShortCircuitIOEnabled()) {
long startTime = Time.monotonicNow(); long startTime = Time.monotonicNow();
getCacheDB().put(keybuf, block.getData().array()); getCacheDB().put(keybuf, block.getData().array());
@ -176,7 +177,7 @@ public void writeBlock(LogicalBlock block) throws IOException {
.acquireClient(parentCache.getPipeline(block.getBlockID())); .acquireClient(parentCache.getPipeline(block.getBlockID()));
ContainerProtocolCalls.writeSmallFile(client, containerName, ContainerProtocolCalls.writeSmallFile(client, containerName,
Long.toString(block.getBlockID()), block.getData().array(), Long.toString(block.getBlockID()), block.getData().array(),
parentCache.getTraceID(block.getBlockID())); traceID);
long endTime = Time.monotonicNow(); long endTime = Time.monotonicNow();
if (parentCache.isTraceEnabled()) { if (parentCache.isTraceEnabled()) {
String datahash = DigestUtils.sha256Hex(block.getData().array()); String datahash = DigestUtils.sha256Hex(block.getData().array());
@ -189,8 +190,9 @@ public void writeBlock(LogicalBlock block) throws IOException {
parentCache.getTargetMetrics().incNumDirectBlockWrites(); parentCache.getTargetMetrics().incNumDirectBlockWrites();
} catch (Exception ex) { } catch (Exception ex) {
parentCache.getTargetMetrics().incNumFailedDirectBlockWrites(); parentCache.getTargetMetrics().incNumFailedDirectBlockWrites();
LOG.error("Direct I/O writing of block:{} to container {} failed", LOG.error("Direct I/O writing of block:{} traceID:{} to "
block.getBlockID(), containerName, ex); + "container {} failed", block.getBlockID(), traceID,
containerName, ex);
throw ex; throw ex;
} finally { } finally {
if (client != null) { if (client != null) {

View File

@ -29,6 +29,7 @@
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* The internal representation maintained by CBlock server as the info for * The internal representation maintained by CBlock server as the info for
@ -53,7 +54,7 @@ public class VolumeDescriptor {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(VolumeDescriptor.class); LoggerFactory.getLogger(VolumeDescriptor.class);
private HashMap<String, ContainerDescriptor> containerMap; private ConcurrentHashMap<String, ContainerDescriptor> containerMap;
private String userName; private String userName;
private int blockSize; private int blockSize;
private long volumeSize; private long volumeSize;
@ -72,13 +73,12 @@ public class VolumeDescriptor {
* and set*() methods are for the same purpose also. * and set*() methods are for the same purpose also.
*/ */
public VolumeDescriptor() { public VolumeDescriptor() {
containerMap = new HashMap<>(); this(null, null, 0, 0);
containerIdOrdered = new ArrayList<>();
} }
public VolumeDescriptor(String userName, String volumeName, long volumeSize, public VolumeDescriptor(String userName, String volumeName, long volumeSize,
int blockSize) { int blockSize) {
this.containerMap = new HashMap<>(); this.containerMap = new ConcurrentHashMap<>();
this.userName = userName; this.userName = userName;
this.volumeName = volumeName; this.volumeName = volumeName;
this.blockSize = blockSize; this.blockSize = blockSize;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.cblock.storage; package org.apache.hadoop.cblock.storage;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.cblock.CBlockConfigKeys;
import org.apache.hadoop.cblock.exception.CBlockException; import org.apache.hadoop.cblock.exception.CBlockException;
import org.apache.hadoop.cblock.meta.ContainerDescriptor; import org.apache.hadoop.cblock.meta.ContainerDescriptor;
import org.apache.hadoop.cblock.meta.VolumeDescriptor; import org.apache.hadoop.cblock.meta.VolumeDescriptor;
@ -37,25 +38,27 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* This class maintains the key space of CBlock, more specifically, the * This class maintains the key space of CBlock, more specifically, the
* volume to container mapping. The core data structure * volume to container mapping. The core data structure
* is a map from users to their volumes info, where volume info is a handler * is a map from users to their volumes info, where volume info is a handler
* to a volume, containing information for IO on that volume. * to a volume, containing information for IO on that volume and a storage
* * client responsible for talking to the SCM.
* and a storage client responsible for talking to the SCM
*
* TODO : all the volume operations are fully serialized, which can potentially
* be optimized.
*
* TODO : if the certain operations (e.g. create) failed, the failure-handling
* logic may not be properly implemented currently.
*/ */
public class StorageManager { public class StorageManager {
private static final Logger LOGGER = private static final Logger LOGGER =
LoggerFactory.getLogger(StorageManager.class); LoggerFactory.getLogger(StorageManager.class);
private final ScmClient storageClient; private final ScmClient storageClient;
private final int numThreads;
private static final int MAX_THREADS =
Runtime.getRuntime().availableProcessors() * 2;
private static final int MAX_QUEUE_CAPACITY = 1024;
/** /**
* We will NOT have the situation where same kv pair getting * We will NOT have the situation where same kv pair getting
* processed, but it is possible to have multiple kv pair being * processed, but it is possible to have multiple kv pair being
@ -78,6 +81,9 @@ public StorageManager(ScmClient storageClient,
this.storageClient = storageClient; this.storageClient = storageClient;
this.user2VolumeMap = new ConcurrentHashMap<>(); this.user2VolumeMap = new ConcurrentHashMap<>();
this.containerSizeB = storageClient.getContainerSize(null); this.containerSizeB = storageClient.getContainerSize(null);
this.numThreads =
ozoneConfig.getInt(CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE,
CBlockConfigKeys.DFS_CBLOCK_MANAGER_POOL_SIZE_DEFAULT);
} }
/** /**
@ -149,6 +155,127 @@ public synchronized void addVolume(VolumeDescriptor volumeDescriptor)
makeVolumeReady(userName, volumeName, volumeDescriptor); makeVolumeReady(userName, volumeName, volumeDescriptor);
} }
private class CreateContainerTask implements Runnable {
private final VolumeDescriptor volume;
private final int containerIdx;
private final ArrayList<String> containerIds;
private final AtomicInteger numFailed;
CreateContainerTask(VolumeDescriptor volume, int containerIdx,
ArrayList<String> containerIds,
AtomicInteger numFailed) {
this.volume = volume;
this.containerIdx = containerIdx;
this.containerIds = containerIds;
this.numFailed = numFailed;
}
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see Thread#run()
*/
public void run() {
ContainerDescriptor container = null;
try {
Pipeline pipeline = storageClient.createContainer(
OzoneProtos.ReplicationType.STAND_ALONE,
OzoneProtos.ReplicationFactor.ONE,
KeyUtil.getContainerName(volume.getUserName(),
volume.getVolumeName(), containerIdx));
container = new ContainerDescriptor(pipeline.getContainerName());
container.setPipeline(pipeline);
container.setContainerIndex(containerIdx);
volume.addContainer(container);
containerIds.set(containerIdx, container.getContainerID());
} catch (Exception e) {
numFailed.incrementAndGet();
if (container != null) {
LOGGER.error("Error creating container Container:{}:" +
" index:{} error:{}", container.getContainerID(),
containerIdx, e);
}
}
}
}
private boolean createVolumeContainers(VolumeDescriptor volume) {
ArrayList<String> containerIds = new ArrayList<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(numThreads,
MAX_THREADS, 1, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());
AtomicInteger numFailedCreates = new AtomicInteger(0);
long allocatedSize = 0;
int containerIdx = 0;
while (allocatedSize < volume.getVolumeSize()) {
// adding null to allocate space in ArrayList
containerIds.add(containerIdx, null);
Runnable task = new CreateContainerTask(volume, containerIdx,
containerIds, numFailedCreates);
executor.submit(task);
allocatedSize += containerSizeB;
containerIdx += 1;
}
// issue the command and then wait for it to finish
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOGGER.error("Error creating volume:{} error:{}",
volume.getVolumeName(), e);
executor.shutdownNow();
Thread.currentThread().interrupt();
}
volume.setContainerIDs(containerIds);
return numFailedCreates.get() == 0;
}
private void deleteContainer(String containerID, boolean force) {
try {
Pipeline pipeline = storageClient.getContainer(containerID);
storageClient.deleteContainer(pipeline, force);
} catch (Exception e) {
LOGGER.error("Error deleting container Container:{} error:{}",
containerID, e);
}
}
private void deleteVolumeContainers(List<String> containers, boolean force)
throws CBlockException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(numThreads,
MAX_THREADS, 1, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(MAX_QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());
for (String deleteContainer : containers) {
if (deleteContainer != null) {
Runnable task = () -> deleteContainer(deleteContainer, force);
executor.submit(task);
}
}
// issue the command and then wait for it to finish
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
} catch (InterruptedException e) {
LOGGER.error("Error deleting containers error:{}", e);
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
/** /**
* Called by CBlock server when creating a fresh volume. The core * Called by CBlock server when creating a fresh volume. The core
@ -172,31 +299,13 @@ public synchronized void createVolume(String userName, String volumeName,
throw new CBlockException("Volume size smaller than block size? " + throw new CBlockException("Volume size smaller than block size? " +
"volume size:" + volumeSize + " block size:" + blockSize); "volume size:" + volumeSize + " block size:" + blockSize);
} }
VolumeDescriptor volume; VolumeDescriptor volume
int containerIdx = 0; = new VolumeDescriptor(userName, volumeName, volumeSize, blockSize);
try { boolean success = createVolumeContainers(volume);
volume = new VolumeDescriptor(userName, volumeName, if (!success) {
volumeSize, blockSize); // cleanup the containers and throw the exception
long allocatedSize = 0; deleteVolumeContainers(volume.getContainerIDsList(), true);
ArrayList<String> containerIds = new ArrayList<>(); throw new CBlockException("Error when creating volume:" + volumeName);
while (allocatedSize < volumeSize) {
Pipeline pipeline = storageClient.createContainer(OzoneProtos
.ReplicationType.STAND_ALONE,
OzoneProtos.ReplicationFactor.ONE,
KeyUtil.getContainerName(userName, volumeName, containerIdx));
ContainerDescriptor container =
new ContainerDescriptor(pipeline.getContainerName());
container.setPipeline(pipeline);
container.setContainerIndex(containerIdx);
volume.addContainer(container);
containerIds.add(container.getContainerID());
allocatedSize += containerSizeB;
containerIdx += 1;
}
volume.setContainerIDs(containerIds);
} catch (IOException e) {
throw new CBlockException("Error when creating volume:" + e.getMessage());
// TODO : delete already created containers? or re-try policy
} }
makeVolumeReady(userName, volumeName, volume); makeVolumeReady(userName, volumeName, volume);
} }
@ -223,16 +332,7 @@ public synchronized void deleteVolume(String userName, String volumeName,
throw new CBlockException("Deleting a non-empty volume without force!"); throw new CBlockException("Deleting a non-empty volume without force!");
} }
VolumeDescriptor volume = user2VolumeMap.get(userName).remove(volumeName); VolumeDescriptor volume = user2VolumeMap.get(userName).remove(volumeName);
for (String containerID : volume.getContainerIDsList()) { deleteVolumeContainers(volume.getContainerIDsList(), force);
try {
Pipeline pipeline = storageClient.getContainer(containerID);
storageClient.deleteContainer(pipeline, force);
} catch (IOException e) {
LOGGER.error("Error deleting container Container:{} error:{}",
containerID, e);
throw new CBlockException(e.getMessage());
}
}
if (user2VolumeMap.get(userName).size() == 0) { if (user2VolumeMap.get(userName).size() == 0) {
user2VolumeMap.remove(userName); user2VolumeMap.remove(userName);
} }

View File

@ -897,6 +897,22 @@
</description> </description>
</property> </property>
<property>
<name>dfs.cblock.manager.pool.size</name>
<value>16</value>
<description>
Number of threads that cblock manager will use for container operations.
</description>
</property>
<property>
<name>dfs.cblock.rpc.timeout.seconds</name>
<value>300</value>
<description>
RPC timeout in seconds used for cblock CLI operations.
</description>
</property>
<property> <property>
<name>dfs.cblock.scm.ipaddress</name> <name>dfs.cblock.scm.ipaddress</name>
<value>127.0.0.1</value> <value>127.0.0.1</value>

View File

@ -22,7 +22,7 @@
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
* NOTE : This class is only for testing purpose. * NOTE : This class is only for testing purpose.
@ -34,8 +34,8 @@
* This is to allow volume creation call and perform standalone tests. * This is to allow volume creation call and perform standalone tests.
*/ */
public final class ContainerLookUpService { public final class ContainerLookUpService {
private static HashMap<String, ContainerDescriptor> private static ConcurrentHashMap<String, ContainerDescriptor>
containers = new HashMap<>(); containers = new ConcurrentHashMap<>();
/** /**
* Return an *existing* container with given Id. * Return an *existing* container with given Id.

View File

@ -27,6 +27,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* This class is the one that directly talks to SCM server. * This class is the one that directly talks to SCM server.
@ -37,7 +38,8 @@
* *
*/ */
public class MockStorageClient implements ScmClient { public class MockStorageClient implements ScmClient {
private static long currentContainerId = -1; private static AtomicInteger currentContainerId =
new AtomicInteger(0);
/** /**
* Ask SCM to get a exclusive container. * Ask SCM to get a exclusive container.
@ -48,9 +50,9 @@ public class MockStorageClient implements ScmClient {
@Override @Override
public Pipeline createContainer(String containerId) public Pipeline createContainer(String containerId)
throws IOException { throws IOException {
currentContainerId += 1; int contId = currentContainerId.getAndIncrement();
ContainerLookUpService.addContainer(Long.toString(currentContainerId)); ContainerLookUpService.addContainer(Long.toString(contId));
return ContainerLookUpService.lookUp(Long.toString(currentContainerId)) return ContainerLookUpService.lookUp(Long.toString(contId))
.getPipeline(); .getPipeline();
} }
@ -126,10 +128,11 @@ public long getContainerSize(Pipeline pipeline) throws IOException {
public Pipeline createContainer(OzoneProtos.ReplicationType type, public Pipeline createContainer(OzoneProtos.ReplicationType type,
OzoneProtos.ReplicationFactor replicationFactor, String containerId) OzoneProtos.ReplicationFactor replicationFactor, String containerId)
throws IOException { throws IOException {
currentContainerId += 1; int contId = currentContainerId.getAndIncrement();
ContainerLookUpService.addContainer(Long.toString(currentContainerId)); ContainerLookUpService.addContainer(Long.toString(contId));
return ContainerLookUpService.lookUp(Long.toString(currentContainerId)) return ContainerLookUpService.lookUp(Long.toString(contId))
.getPipeline(); } .getPipeline();
}
/** /**
* Returns a set of Nodes that meet a query criteria. * Returns a set of Nodes that meet a query criteria.