HDFS-8410. Add computation time metrics to datanode for ECWorker. Contributed by SammiChen.
This commit is contained in:
parent
ae8bccd509
commit
61e30cf83c
@ -103,7 +103,10 @@ class StripedBlockReconstructor extends StripedReconstructor
|
|||||||
int[] erasedIndices = stripedWriter.getRealTargetIndices();
|
int[] erasedIndices = stripedWriter.getRealTargetIndices();
|
||||||
ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen);
|
ByteBuffer[] outputs = stripedWriter.getRealTargetBuffers(toReconstructLen);
|
||||||
|
|
||||||
|
long start = System.nanoTime();
|
||||||
getDecoder().decode(inputs, erasedIndices, outputs);
|
getDecoder().decode(inputs, erasedIndices, outputs);
|
||||||
|
long end = System.nanoTime();
|
||||||
|
this.getDatanode().getMetrics().incrECDecodingTime(end - start);
|
||||||
|
|
||||||
stripedWriter.updateRealTargetBuffers(toReconstructLen);
|
stripedWriter.updateRealTargetBuffers(toReconstructLen);
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
package org.apache.hadoop.hdfs.server.datanode.metrics;
|
package org.apache.hadoop.hdfs.server.datanode.metrics;
|
||||||
|
|
||||||
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
|
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
|
||||||
|
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -134,6 +135,8 @@ public class DataNodeMetrics {
|
|||||||
MutableCounterLong ecReconstructionTasks;
|
MutableCounterLong ecReconstructionTasks;
|
||||||
@Metric("Count of erasure coding failed reconstruction tasks")
|
@Metric("Count of erasure coding failed reconstruction tasks")
|
||||||
MutableCounterLong ecFailedReconstructionTasks;
|
MutableCounterLong ecFailedReconstructionTasks;
|
||||||
|
// Nanoseconds spent by decoding tasks.
|
||||||
|
MutableCounterLong ecDecodingTimeNanos;
|
||||||
|
|
||||||
final MetricsRegistry registry = new MetricsRegistry("datanode");
|
final MetricsRegistry registry = new MetricsRegistry("datanode");
|
||||||
final String name;
|
final String name;
|
||||||
@ -153,6 +156,9 @@ public class DataNodeMetrics {
|
|||||||
sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
|
sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
|
||||||
ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len];
|
ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len];
|
||||||
ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len];
|
ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len];
|
||||||
|
ecDecodingTimeNanos = registry.newCounter(
|
||||||
|
info("ecDecodingTimeNanos", "Nanoseconds spent by decoding tasks"),
|
||||||
|
(long) 0);
|
||||||
|
|
||||||
for (int i = 0; i < len; i++) {
|
for (int i = 0; i < len; i++) {
|
||||||
int interval = intervals[i];
|
int interval = intervals[i];
|
||||||
@ -442,7 +448,10 @@ public class DataNodeMetrics {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void setDataNodeActiveXceiversCount(int value) {
|
public void setDataNodeActiveXceiversCount(int value) {
|
||||||
this.dataNodeActiveXceiversCount.set(value);
|
dataNodeActiveXceiversCount.set(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void incrECDecodingTime(long decodingTimeNanos) {
|
||||||
|
ecDecodingTimeNanos.incr(decodingTimeNanos);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -42,14 +43,15 @@ import static org.junit.Assert.assertEquals;
|
|||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This file tests the erasure coding metrics in DataNode.
|
* This file tests the erasure coding metrics in DataNode.
|
||||||
*/
|
*/
|
||||||
@ -94,24 +96,37 @@ public class TestDataNodeErasureCodingMetrics {
|
|||||||
DataNode workerDn = doTest("/testEcTasks");
|
DataNode workerDn = doTest("/testEcTasks");
|
||||||
MetricsRecordBuilder rb = getMetrics(workerDn.getMetrics().name());
|
MetricsRecordBuilder rb = getMetrics(workerDn.getMetrics().name());
|
||||||
|
|
||||||
// EcReconstructionTasks metric value will be updated in the finally block
|
// Ensure that reconstruction task is finished
|
||||||
// of striped reconstruction thread. Here, giving a grace period to finish
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
// EC reconstruction metric updates in DN.
|
@Override
|
||||||
LOG.info("Waiting to finish EC reconstruction metric updates in DN");
|
public Boolean get() {
|
||||||
int retries = 0;
|
long taskMetricValue = getLongCounter("EcReconstructionTasks", rb);
|
||||||
while (retries < 20) {
|
return (taskMetricValue > 0);
|
||||||
long taskMetricValue = getLongCounter("EcReconstructionTasks", rb);
|
|
||||||
if (taskMetricValue > 0) {
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
Thread.sleep(500);
|
}, 500, 10000);
|
||||||
retries++;
|
|
||||||
rb = getMetrics(workerDn.getMetrics().name());
|
|
||||||
}
|
|
||||||
assertCounter("EcReconstructionTasks", (long) 1, rb);
|
assertCounter("EcReconstructionTasks", (long) 1, rb);
|
||||||
assertCounter("EcFailedReconstructionTasks", (long) 0, rb);
|
assertCounter("EcFailedReconstructionTasks", (long) 0, rb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 120000)
|
||||||
|
public void testEcCodingTime() throws Exception {
|
||||||
|
DataNode workerDn = doTest("/testEcCodingTime");
|
||||||
|
MetricsRecordBuilder rb = getMetrics(workerDn.getMetrics().name());
|
||||||
|
|
||||||
|
// Ensure that reconstruction task is finished
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
long taskMetricValue = getLongCounter("EcReconstructionTasks", rb);
|
||||||
|
return (taskMetricValue > 0);
|
||||||
|
}
|
||||||
|
}, 500, 10000);
|
||||||
|
|
||||||
|
long decodeTime = getLongCounter("ecDecodingTimeNanos", rb);
|
||||||
|
Assert.assertTrue(decodeTime > 0);
|
||||||
|
}
|
||||||
|
|
||||||
private DataNode doTest(String fileName) throws Exception {
|
private DataNode doTest(String fileName) throws Exception {
|
||||||
|
|
||||||
Path file = new Path(fileName);
|
Path file = new Path(fileName);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user