HDFS-12044. Mismatch between BlockManager.maxReplicationStreams and ErasureCodingWorker.stripedReconstructionPool pool size causes slow and bursty recovery. (Contributed by Lei (Eddy) Xu)

This commit is contained in:
Lei Xu 2017-07-28 10:49:23 -07:00
parent 9ea01fd956
commit 77791e4c36
8 changed files with 169 additions and 10 deletions

View File

@ -83,6 +83,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -811,10 +812,30 @@ private static boolean getClientDataTransferTcpNoDelay(Configuration conf) {
public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize,
int maxPoolSize, long keepAliveTimeSecs, String threadNamePrefix,
boolean runRejectedExec) {
return getThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTimeSecs,
new SynchronousQueue<>(), threadNamePrefix, runRejectedExec);
}
/**
* Utility to create a {@link ThreadPoolExecutor}.
*
* @param corePoolSize - min threads in the pool, even if idle
* @param maxPoolSize - max threads in the pool
* @param keepAliveTimeSecs - max seconds beyond which excess idle threads
* will be terminated
* @param queue - the queue to use for holding tasks before they are executed.
* @param threadNamePrefix - name prefix for the pool threads
* @param runRejectedExec - when true, rejected tasks from
* ThreadPoolExecutor are run in the context of calling thread
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor getThreadPoolExecutor(int corePoolSize,
int maxPoolSize, long keepAliveTimeSecs, BlockingQueue<Runnable> queue,
String threadNamePrefix, boolean runRejectedExec) {
Preconditions.checkArgument(corePoolSize > 0);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maxPoolSize, keepAliveTimeSecs, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new Daemon.DaemonFactory() {
queue, new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override

View File

@ -2203,6 +2203,17 @@ public void incrementXmitsInProgress() {
xmitsInProgress.getAndIncrement();
}
/**
* Increments the xmitInProgress count by given value.
*
* @param delta the amount of xmitsInProgress to increase.
* @see #incrementXmitsInProgress()
*/
public void incrementXmitsInProcess(int delta) {
Preconditions.checkArgument(delta >= 0);
xmitsInProgress.getAndAdd(delta);
}
/**
* Decrements the xmitsInProgress count
*/
@ -2210,6 +2221,16 @@ public void decrementXmitsInProgress() {
xmitsInProgress.getAndDecrement();
}
/**
* Decrements the xmitsInProgress count by given value.
*
* @see #decrementXmitsInProgress()
*/
public void decrementXmitsInProgress(int delta) {
Preconditions.checkArgument(delta >= 0);
xmitsInProgress.getAndAdd(0 - delta);
}
private void reportBadBlock(final BPOfferService bpos,
final ExtendedBlock block, final String msg) {
FsVolumeSpi volume = getFSDataset().getVolume(block);

View File

@ -27,6 +27,7 @@
import org.slf4j.Logger;
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -93,7 +94,8 @@ private void initializeStripedBlkReconstructionThreadPool(int numThreads) {
LOG.debug("Using striped block reconstruction; pool threads={}",
numThreads);
stripedReconstructionPool = DFSUtilClient.getThreadPoolExecutor(2,
numThreads, 60, "StripedBlockReconstruction-", false);
numThreads, 60, new LinkedBlockingQueue<>(),
"StripedBlockReconstruction-", false);
stripedReconstructionPool.allowCoreThreadTimeOut(true);
}
@ -106,6 +108,7 @@ private void initializeStripedBlkReconstructionThreadPool(int numThreads) {
public void processErasureCodingTasks(
Collection<BlockECReconstructionInfo> ecTasks) {
for (BlockECReconstructionInfo reconInfo : ecTasks) {
int xmitsSubmitted = 0;
try {
StripedReconstructionInfo stripedReconInfo =
new StripedReconstructionInfo(
@ -113,15 +116,25 @@ public void processErasureCodingTasks(
reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(),
reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes(),
reconInfo.getTargetStorageIDs());
// It may throw IllegalArgumentException from task#stripedReader
// constructor.
final StripedBlockReconstructor task =
new StripedBlockReconstructor(this, stripedReconInfo);
if (task.hasValidTargets()) {
// See HDFS-12044. We increase xmitsInProgress even the task is only
// enqueued, so that
// 1) NN will not send more tasks than what DN can execute and
// 2) DN will not throw away reconstruction tasks, and instead keeps
// an unbounded number of tasks in the executor's task queue.
xmitsSubmitted = task.getXmits();
getDatanode().incrementXmitsInProcess(xmitsSubmitted);
stripedReconstructionPool.submit(task);
} else {
LOG.warn("No missing internal block. Skip reconstruction for task:{}",
reconInfo);
}
} catch (Throwable e) {
getDatanode().decrementXmitsInProgress(xmitsSubmitted);
LOG.warn("Failed to reconstruct striped block {}",
reconInfo.getExtendedBlock().getLocalBlock(), e);
}

View File

@ -48,7 +48,6 @@ boolean hasValidTargets() {
@Override
public void run() {
getDatanode().incrementXmitsInProgress();
try {
initDecoderIfNecessary();
@ -66,7 +65,7 @@ public void run() {
LOG.warn("Failed to reconstruct striped block: {}", getBlockGroup(), e);
getDatanode().getMetrics().incrECFailedReconstructionTasks();
} finally {
getDatanode().decrementXmitsInProgress();
getDatanode().decrementXmitsInProgress(getXmits());
final DataNodeMetrics metrics = getDatanode().getMetrics();
metrics.incrECReconstructionTasks();
metrics.incrECReconstructionBytesRead(getBytesRead());

View File

@ -68,6 +68,8 @@ class StripedReader {
private int[] successList;
private final int minRequiredSources;
// the number of xmits used by the re-construction task.
private final int xmits;
// The buffers and indices for striped blocks whose length is 0
private ByteBuffer[] zeroStripeBuffers;
private short[] zeroStripeIndices;
@ -107,6 +109,12 @@ class StripedReader {
zeroStripeIndices = new short[zeroStripNum];
}
// It is calculated by the maximum number of connections from either sources
// or targets.
xmits = Math.max(minRequiredSources,
stripedReconInfo.getTargets() != null ?
stripedReconInfo.getTargets().length : 0);
this.liveIndices = stripedReconInfo.getLiveIndices();
assert liveIndices != null;
this.sources = stripedReconInfo.getSources();
@ -472,4 +480,16 @@ InetSocketAddress getSocketAddress4Transfer(DatanodeInfo dnInfo) {
CachingStrategy getCachingStrategy() {
return reconstructor.getCachingStrategy();
}
/**
* Return the xmits of this EC reconstruction task.
* <p>
* DN uses it to coordinate with NN to adjust the speed of scheduling the
* EC reconstruction tasks to this DN.
*
* @return the xmits of this reconstruction task.
*/
int getXmits() {
return xmits;
}
}

View File

@ -103,5 +103,20 @@ StorageType[] getTargetStorageTypes() {
String[] getTargetStorageIds() {
return targetStorageIds;
}
/**
* Return the weight of this EC reconstruction task.
*
* DN uses it to coordinate with NN to adjust the speed of scheduling the
* reconstructions tasks to this DN.
*
* @return the weight of this reconstruction task.
* @see HDFS-12044
*/
int getWeight() {
// See HDFS-12044. The weight of a RS(n, k) is calculated by the network
// connections it opens.
return sources.length + targets.length;
}
}

View File

@ -133,7 +133,6 @@ abstract class StripedReconstructor {
}
blockGroup = stripedReconInfo.getBlockGroup();
stripedReader = new StripedReader(this, datanode, conf, stripedReconInfo);
cachingStrategy = CachingStrategy.newDefaultStrategy();
positionInBlock = 0L;
@ -233,6 +232,13 @@ ExtendedBlock getBlockGroup() {
return blockGroup;
}
/**
* Get the xmits that _will_ be used for this reconstruction task.
*/
int getXmits() {
return stripedReader.getXmits();
}
BitSet getLiveBitSet() {
return liveBitSet;
}

View File

@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -44,6 +45,7 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
@ -81,6 +83,7 @@ enum ReconstructionType {
Any
}
private Configuration conf;
private MiniDFSCluster cluster;
private DistributedFileSystem fs;
// Map: DatanodeID -> datanode index in cluster
@ -89,7 +92,7 @@ enum ReconstructionType {
@Before
public void setup() throws IOException {
final Configuration conf = new Configuration();
conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
conf.setInt(
DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
@ -263,6 +266,14 @@ private int generateErrors(Map<ExtendedBlock, DataNode> corruptTargets,
return stoppedDNs;
}
private static void writeFile(DistributedFileSystem fs, String fileName,
int fileLen) throws Exception {
final byte[] data = new byte[fileLen];
Arrays.fill(data, (byte) 1);
DFSTestUtil.writeFile(fs, new Path(fileName), data);
StripedFileTestUtil.waitBlockGroupsReported(fs, fileName);
}
/**
* Test the file blocks reconstruction.
* 1. Check the replica is reconstructed in the target datanode,
@ -278,10 +289,7 @@ private void assertFileBlocksReconstruction(String fileName, int fileLen,
Path file = new Path(fileName);
final byte[] data = new byte[fileLen];
Arrays.fill(data, (byte) 1);
DFSTestUtil.writeFile(fs, file, data);
StripedFileTestUtil.waitBlockGroupsReported(fs, fileName);
writeFile(fs, fileName, fileLen);
LocatedBlocks locatedBlocks =
StripedFileTestUtil.getLocatedBlocks(file, fs);
@ -424,4 +432,60 @@ public void testProcessErasureCodingTasksSubmitionShouldSucceed()
ecTasks.add(invalidECInfo);
dataNode.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
}
// HDFS-12044
@Test(timeout = 60000)
public void testNNSendsErasureCodingTasks() throws Exception {
testNNSendsErasureCodingTasks(1);
testNNSendsErasureCodingTasks(2);
}
private void testNNSendsErasureCodingTasks(int deadDN) throws Exception {
cluster.shutdown();
final int numDataNodes = dnNum + 1;
conf.setInt(
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY, 10);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 20);
conf.setInt(DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_BLK_THREADS_KEY,
2);
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDataNodes).build();
cluster.waitActive();
fs = cluster.getFileSystem();
ErasureCodingPolicy policy = StripedFileTestUtil.getDefaultECPolicy();
fs.getClient().setErasureCodingPolicy("/", policy.getName());
final int fileLen = cellSize * ecPolicy.getNumDataUnits() * 2;
for (int i = 0; i < 100; i++) {
writeFile(fs, "/ec-file-" + i, fileLen);
}
// Inject data-loss by tear down desired number of DataNodes.
assertTrue(policy.getNumParityUnits() >= deadDN);
List<DataNode> dataNodes = new ArrayList<>(cluster.getDataNodes());
Collections.shuffle(dataNodes);
for (DataNode dn : dataNodes.subList(0, deadDN)) {
shutdownDataNode(dn);
}
final FSNamesystem ns = cluster.getNamesystem();
GenericTestUtils.waitFor(() -> ns.getPendingDeletionBlocks() == 0,
500, 30000);
// Make sure that all pending reconstruction tasks can be processed.
while (ns.getPendingReconstructionBlocks() > 0) {
long timeoutPending = ns.getNumTimedOutPendingReconstructions();
assertTrue(String.format("Found %d timeout pending reconstruction tasks",
timeoutPending), timeoutPending == 0);
Thread.sleep(1000);
}
// Verify all DN reaches zero xmitsInProgress.
GenericTestUtils.waitFor(() ->
cluster.getDataNodes().stream().mapToInt(
DataNode::getXmitsInProgress).sum() == 0,
500, 30000
);
}
}