Erasure Coding: metrics xmitsInProgress become to negative. Contributed by maobaolong and Toshihiko Uchida.

This commit is contained in:
Ayush Saxena 2020-05-03 19:09:14 +05:30
parent 44de193bec
commit d6fc482a54
4 changed files with 19 additions and 1 deletions

View File

@ -170,4 +170,8 @@ public void shutDown() {
stripedReconstructionPool.shutdown(); stripedReconstructionPool.shutdown();
stripedReadPool.shutdown(); stripedReadPool.shutdown();
} }
public float getXmitWeight() {
return xmitWeight;
}
} }

View File

@ -67,7 +67,11 @@ public void run() {
LOG.warn("Failed to reconstruct striped block: {}", getBlockGroup(), e); LOG.warn("Failed to reconstruct striped block: {}", getBlockGroup(), e);
getDatanode().getMetrics().incrECFailedReconstructionTasks(); getDatanode().getMetrics().incrECFailedReconstructionTasks();
} finally { } finally {
getDatanode().decrementXmitsInProgress(getXmits()); float xmitWeight = getErasureCodingWorker().getXmitWeight();
// if the xmits is smaller than 1, the xmitsSubmitted should be set to 1
// because if it set to zero, we cannot to measure the xmits submitted
int xmitsSubmitted = Math.max((int) (getXmits() * xmitWeight), 1);
getDatanode().decrementXmitsInProgress(xmitsSubmitted);
final DataNodeMetrics metrics = getDatanode().getMetrics(); final DataNodeMetrics metrics = getDatanode().getMetrics();
metrics.incrECReconstructionTasks(); metrics.incrECReconstructionTasks();
metrics.incrECReconstructionBytesRead(getBytesRead()); metrics.incrECReconstructionBytesRead(getBytesRead());

View File

@ -275,4 +275,8 @@ Configuration getConf() {
DataNode getDatanode() { DataNode getDatanode() {
return datanode; return datanode;
} }
public ErasureCodingWorker getErasureCodingWorker() {
return erasureCodingWorker;
}
} }

View File

@ -514,6 +514,8 @@ private void testNNSendsErasureCodingTasks(int deadDN) throws Exception {
@Test(timeout = 180000) @Test(timeout = 180000)
public void testErasureCodingWorkerXmitsWeight() throws Exception { public void testErasureCodingWorkerXmitsWeight() throws Exception {
testErasureCodingWorkerXmitsWeight(0.5f,
(int) (ecPolicy.getNumDataUnits() * 0.5f));
testErasureCodingWorkerXmitsWeight(1f, ecPolicy.getNumDataUnits()); testErasureCodingWorkerXmitsWeight(1f, ecPolicy.getNumDataUnits());
testErasureCodingWorkerXmitsWeight(0f, 1); testErasureCodingWorkerXmitsWeight(0f, 1);
testErasureCodingWorkerXmitsWeight(10f, 10 * ecPolicy.getNumDataUnits()); testErasureCodingWorkerXmitsWeight(10f, 10 * ecPolicy.getNumDataUnits());
@ -567,6 +569,10 @@ public void stripedBlockReconstruction() throws IOException {
} finally { } finally {
barrier.await(); barrier.await();
DataNodeFaultInjector.set(oldInjector); DataNodeFaultInjector.set(oldInjector);
for (final DataNode curDn : cluster.getDataNodes()) {
GenericTestUtils.waitFor(() -> curDn.getXceiverCount() <= 1, 10, 60000);
assertEquals(0, curDn.getXmitsInProgress());
}
} }
} }
} }