HDFS-8391. NN should consider current EC tasks handling count from DN while assigning new tasks. Contributed by Uma Maheswara Rao G.
This commit is contained in:
parent
bba15e06d2
commit
c99c337928
@ -212,3 +212,6 @@
|
|||||||
|
|
||||||
HDFS-8364. Erasure coding: fix some minor bugs in EC CLI
|
HDFS-8364. Erasure coding: fix some minor bugs in EC CLI
|
||||||
(Walter Su via vinayakumarb)
|
(Walter Su via vinayakumarb)
|
||||||
|
|
||||||
|
HDFS-8391. NN should consider current EC tasks handling count from DN while
|
||||||
|
assigning new tasks. (umamahesh)
|
||||||
|
@ -1910,6 +1910,21 @@ int getXmitsInProgress() {
|
|||||||
return xmitsInProgress.get();
|
return xmitsInProgress.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increments the xmitsInProgress count. xmitsInProgress count represents the
|
||||||
|
* number of data replication/reconstruction tasks running currently.
|
||||||
|
*/
|
||||||
|
public void incrementXmitsInProgress() {
|
||||||
|
xmitsInProgress.getAndIncrement();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decrements the xmitsInProgress count
|
||||||
|
*/
|
||||||
|
public void decrementXmitsInProgress() {
|
||||||
|
xmitsInProgress.getAndDecrement();
|
||||||
|
}
|
||||||
|
|
||||||
private void reportBadBlock(final BPOfferService bpos,
|
private void reportBadBlock(final BPOfferService bpos,
|
||||||
final ExtendedBlock block, final String msg) {
|
final ExtendedBlock block, final String msg) {
|
||||||
FsVolumeSpi volume = getFSDataset().getVolume(block);
|
FsVolumeSpi volume = getFSDataset().getVolume(block);
|
||||||
@ -2128,7 +2143,7 @@ private class DataTransfer implements Runnable {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
xmitsInProgress.getAndIncrement();
|
incrementXmitsInProgress();
|
||||||
Socket sock = null;
|
Socket sock = null;
|
||||||
DataOutputStream out = null;
|
DataOutputStream out = null;
|
||||||
DataInputStream in = null;
|
DataInputStream in = null;
|
||||||
@ -2207,7 +2222,7 @@ public void run() {
|
|||||||
// check if there are any disk problem
|
// check if there are any disk problem
|
||||||
checkDiskErrorAsync();
|
checkDiskErrorAsync();
|
||||||
} finally {
|
} finally {
|
||||||
xmitsInProgress.getAndDecrement();
|
decrementXmitsInProgress();
|
||||||
IOUtils.closeStream(blockSender);
|
IOUtils.closeStream(blockSender);
|
||||||
IOUtils.closeStream(out);
|
IOUtils.closeStream(out);
|
||||||
IOUtils.closeStream(in);
|
IOUtils.closeStream(in);
|
||||||
|
@ -312,6 +312,7 @@ private long getBlockLen(ExtendedBlock blockGroup, int i) {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
datanode.incrementXmitsInProgress();
|
||||||
try {
|
try {
|
||||||
// Store the indices of successfully read source
|
// Store the indices of successfully read source
|
||||||
// This will be updated after doing real read.
|
// This will be updated after doing real read.
|
||||||
@ -397,8 +398,9 @@ public void run() {
|
|||||||
// Currently we don't check the acks for packets, this is similar as
|
// Currently we don't check the acks for packets, this is similar as
|
||||||
// block replication.
|
// block replication.
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
LOG.warn("Failed to recover striped block: " + blockGroup);
|
LOG.warn("Failed to recover striped block: " + blockGroup, e);
|
||||||
} finally {
|
} finally {
|
||||||
|
datanode.decrementXmitsInProgress();
|
||||||
// close block readers
|
// close block readers
|
||||||
for (StripedReader stripedReader : stripedReaders) {
|
for (StripedReader stripedReader : stripedReaders) {
|
||||||
closeBlockReader(stripedReader.blockReader);
|
closeBlockReader(stripedReader.blockReader);
|
||||||
|
Loading…
Reference in New Issue
Block a user