From ad9441122f31547fcab29f50e64d52a8895906b6 Mon Sep 17 00:00:00 2001 From: Kai Zheng Date: Sun, 15 May 2016 23:39:09 -0700 Subject: [PATCH] HDFS-8449. Add tasks count metrics to datanode for ECWorker. Contributed by Bo Li. --- .../erasurecode/StripedReconstructor.java | 4 +- .../datanode/metrics/DataNodeMetrics.java | 14 ++ .../hadoop/hdfs/StripedFileTestUtil.java | 32 +++- .../hdfs/TestReconstructStripedFile.java | 27 +--- .../TestDataNodeErasureCodingMetrics.java | 153 ++++++++++++++++++ 5 files changed, 204 insertions(+), 26 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java index 1b59b22ec2..c80bf96143 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -179,11 +179,11 @@ public void run() { // block replication. } catch (Throwable e) { LOG.warn("Failed to reconstruct striped block: {}", blockGroup, e); + datanode.getMetrics().incrECFailedReconstructionTasks(); } finally { datanode.decrementXmitsInProgress(); - + datanode.getMetrics().incrECReconstructionTasks(); stripedReader.close(); - stripedWriter.close(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 085762bf46..3d504d6b0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -126,6 +126,11 @@ public class DataNodeMetrics { @Metric MutableRate sendDataPacketTransferNanos; final MutableQuantiles[] sendDataPacketTransferNanosQuantiles; + @Metric("Count of erasure coding reconstruction tasks") + MutableCounterLong ecReconstructionTasks; + @Metric("Count of erasure coding failed reconstruction tasks") + MutableCounterLong ecFailedReconstructionTasks; + final MetricsRegistry registry = new MetricsRegistry("datanode"); final String name; JvmMetrics jvmMetrics = null; @@ -415,4 +420,13 @@ public void addRamDiskBlocksLazyPersistWindowMs(long latencyMs) { q.add(latencyMs); } } + + public void incrECReconstructionTasks() { + ecReconstructionTasks.incr(); + } + + public void incrECFailedReconstructionTasks() { + ecFailedReconstructionTasks.incr(); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java index 3c58133947..6dcccc3cdc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java @@ -74,7 +74,7 @@ public class StripedFileTestUtil { static int numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2; static int BLOCK_GROUP_SIZE = blockSize * NUM_DATA_BLOCKS; - static byte[] generateBytes(int cnt) { + public static byte[] generateBytes(int cnt) { byte[] bytes = new byte[cnt]; for (int i = 0; i < cnt; i++) { bytes[i] = getByte(i); @@ -502,4 +502,34 @@ static void verifyParityBlocks(Configuration conf, final long size, } } } + + /** + * Wait for the reconstruction to be finished when the file has + * corrupted blocks. + */ + public static LocatedBlocks waitForReconstructionFinished(Path file, + DistributedFileSystem fs, int groupSize) + throws Exception { + final int attempts = 60; + for (int i = 0; i < attempts; i++) { + LocatedBlocks locatedBlocks = getLocatedBlocks(file, fs); + LocatedStripedBlock lastBlock = + (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); + DatanodeInfo[] storageInfos = lastBlock.getLocations(); + if (storageInfos.length >= groupSize) { + return locatedBlocks; + } + Thread.sleep(1000); + } + throw new IOException("Time out waiting for EC block reconstruction."); + } + + /** + * Get the located blocks of a file. + */ + public static LocatedBlocks getLocatedBlocks(Path file, + DistributedFileSystem fs) + throws IOException { + return fs.getClient().getLocatedBlocks(file.toString(), 0, Long.MAX_VALUE); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java index 7155e74463..36d2dbd65e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java @@ -269,7 +269,8 @@ private void assertFileBlocksReconstruction(String fileName, int fileLen, DFSTestUtil.writeFile(fs, file, data); StripedFileTestUtil.waitBlockGroupsReported(fs, fileName); - LocatedBlocks locatedBlocks = getLocatedBlocks(file); + LocatedBlocks locatedBlocks = + StripedFileTestUtil.getLocatedBlocks(file, fs); assertEquals(locatedBlocks.getFileLength(), fileLen); LocatedStripedBlock lastBlock = @@ -325,7 +326,7 @@ private void assertFileBlocksReconstruction(String fileName, int fileLen, int stoppedDN = generateErrors(errorMap, type); // Check the locatedBlocks of the file again - locatedBlocks = getLocatedBlocks(file); + locatedBlocks = StripedFileTestUtil.getLocatedBlocks(file, fs); lastBlock = (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); storageInfos = lastBlock.getLocations(); assertEquals(storageInfos.length, groupSize - stoppedDN); @@ -338,7 +339,7 @@ private void assertFileBlocksReconstruction(String fileName, int fileLen, } } - waitForReconstructionFinished(file, groupSize); + StripedFileTestUtil.waitForReconstructionFinished(file, fs, groupSize); targetDNs = sortTargetsByReplicas(blocks, targetDNs); @@ -381,26 +382,6 @@ private int[] sortTargetsByReplicas(ExtendedBlock[] blocks, int[] targetDNs) { return result; } - private LocatedBlocks waitForReconstructionFinished(Path file, int groupSize) - throws Exception { - final int ATTEMPTS = 60; - for (int i = 0; i < ATTEMPTS; i++) { - LocatedBlocks locatedBlocks = getLocatedBlocks(file); - LocatedStripedBlock lastBlock = - (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); - DatanodeInfo[] storageInfos = lastBlock.getLocations(); - if (storageInfos.length >= groupSize) { - return locatedBlocks; - } - Thread.sleep(1000); - } - throw new IOException ("Time out waiting for EC block reconstruction."); - } - - private LocatedBlocks getLocatedBlocks(Path file) throws IOException { - return fs.getClient().getLocatedBlocks(file.toString(), 0, Long.MAX_VALUE); - } - /* * Tests that processErasureCodingTasks should not throw exceptions out due to * invalid ECTask submission. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java new file mode 100644 index 0000000000..e401fed1f7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StripedFileTestUtil; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.junit.Assert.assertEquals; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + + +/** + * This file tests the erasure coding metrics in DataNode. + */ +public class TestDataNodeErasureCodingMetrics { + public static final Log LOG = LogFactory. + getLog(TestDataNodeErasureCodingMetrics.class); + + private static final int DATA_BLK_NUM = StripedFileTestUtil.NUM_DATA_BLOCKS; + private static final int PARITY_BLK_NUM = + StripedFileTestUtil.NUM_PARITY_BLOCKS; + private static final int CELLSIZE = + StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; + private static final int BLOCKSIZE = CELLSIZE; + private static final int GROUPSIZE = DATA_BLK_NUM + PARITY_BLK_NUM; + private static final int DN_NUM = GROUPSIZE + 1; + + private MiniDFSCluster cluster; + private Configuration conf; + private DistributedFileSystem fs; + + @Before + public void setup() throws IOException { + conf = new Configuration(); + + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DN_NUM).build(); + cluster.waitActive(); + cluster.getFileSystem().getClient().setErasureCodingPolicy("/", null); + fs = cluster.getFileSystem(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test(timeout = 120000) + public void testEcTasks() throws Exception { + DataNode workerDn = doTest("/testEcTasks"); + MetricsRecordBuilder rb = getMetrics(workerDn.getMetrics().name()); + assertCounter("EcReconstructionTasks", (long) 1, rb); + assertCounter("EcFailedReconstructionTasks", (long) 0, rb); + } + + private DataNode doTest(String fileName) throws Exception { + + Path file = new Path(fileName); + long fileLen = DATA_BLK_NUM * BLOCKSIZE; + final byte[] data = StripedFileTestUtil.generateBytes((int) fileLen); + DFSTestUtil.writeFile(fs, file, data); + StripedFileTestUtil.waitBlockGroupsReported(fs, fileName); + + LocatedBlocks locatedBlocks = + StripedFileTestUtil.getLocatedBlocks(file, fs); + //only one block group + LocatedStripedBlock lastBlock = + (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock(); + DataNode workerDn = null; + DatanodeInfo[] locations = lastBlock.getLocations(); + assertEquals(locations.length, GROUPSIZE); + // we have ONE extra datanode in addition to the GROUPSIZE datanodes, here + // is to find the extra datanode that the reconstruction task will run on, + // according to the current block placement logic for striped files. + // This can be improved later to be flexible regardless wherever the task + // runs. + for (DataNode dn: cluster.getDataNodes()) { + boolean appear = false; + for (DatanodeInfo info: locations) { + if (dn.getDatanodeUuid().equals(info.getDatanodeUuid())) { + appear = true; + break; + } + } + if(!appear) { + workerDn = dn; + break; + } + } + byte[] indices = lastBlock.getBlockIndices(); + //corrupt the first block + DataNode toCorruptDn = cluster.getDataNodes().get(indices[0]); + toCorruptDn.shutdown(); + setDataNodeDead(toCorruptDn.getDatanodeId()); + DFSTestUtil.waitForDatanodeState(cluster, toCorruptDn.getDatanodeUuid(), + false, 10000 ); + final BlockManager bm = cluster.getNamesystem().getBlockManager(); + BlockManagerTestUtil.getComputedDatanodeWork(bm); + cluster.triggerHeartbeats(); + StripedFileTestUtil.waitForReconstructionFinished(file, fs, GROUPSIZE); + + return workerDn; + } + + private void setDataNodeDead(DatanodeID dnID) throws IOException { + DatanodeDescriptor dnd = + NameNodeAdapter.getDatanode(cluster.getNamesystem(), dnID); + DFSTestUtil.setDatanodeDead(dnd); + BlockManagerTestUtil.checkHeartbeat( + cluster.getNamesystem().getBlockManager()); + } + +}