HDFS-13339. Volume reference can't be released and may lead to deadlock when DataXceiver does a check volume. Contributed by liaoyuxiangqin and Zsolt Venczel.
This commit is contained in:
parent
9c4cbed8d1
commit
9efb4b7db0
@ -46,6 +46,7 @@
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
@ -103,6 +104,8 @@ public class DatasetVolumeChecker {
|
|||||||
private static final VolumeCheckContext IGNORED_CONTEXT =
|
private static final VolumeCheckContext IGNORED_CONTEXT =
|
||||||
new VolumeCheckContext();
|
new VolumeCheckContext();
|
||||||
|
|
||||||
|
private final ExecutorService checkVolumeResultHandlerExecutorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param conf Configuration object.
|
* @param conf Configuration object.
|
||||||
* @param timer {@link Timer} object used for throttling checks.
|
* @param timer {@link Timer} object used for throttling checks.
|
||||||
@ -163,6 +166,12 @@ public DatasetVolumeChecker(Configuration conf, Timer timer)
|
|||||||
.setNameFormat("DataNode DiskChecker thread %d")
|
.setNameFormat("DataNode DiskChecker thread %d")
|
||||||
.setDaemon(true)
|
.setDaemon(true)
|
||||||
.build()));
|
.build()));
|
||||||
|
|
||||||
|
checkVolumeResultHandlerExecutorService = Executors.newCachedThreadPool(
|
||||||
|
new ThreadFactoryBuilder()
|
||||||
|
.setNameFormat("VolumeCheck ResultHandler thread %d")
|
||||||
|
.setDaemon(true)
|
||||||
|
.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -292,7 +301,9 @@ public boolean checkVolume(
|
|||||||
numVolumeChecks.incrementAndGet();
|
numVolumeChecks.incrementAndGet();
|
||||||
Futures.addCallback(olf.get(),
|
Futures.addCallback(olf.get(),
|
||||||
new ResultHandler(volumeReference, new HashSet<>(), new HashSet<>(),
|
new ResultHandler(volumeReference, new HashSet<>(), new HashSet<>(),
|
||||||
new AtomicLong(1), callback));
|
new AtomicLong(1), callback),
|
||||||
|
checkVolumeResultHandlerExecutorService
|
||||||
|
);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
IOUtils.cleanup(null, volumeReference);
|
IOUtils.cleanup(null, volumeReference);
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.VolumeCheckContext;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.VolumeCheckContext;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||||
import org.apache.hadoop.util.FakeTimer;
|
import org.apache.hadoop.util.FakeTimer;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
@ -122,6 +123,8 @@ public void call(Set<FsVolumeSpi> healthyVolumes,
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
GenericTestUtils.waitFor(() -> numCallbackInvocations.get() > 0, 5, 10000);
|
||||||
|
|
||||||
// Ensure that the check was invoked at least once.
|
// Ensure that the check was invoked at least once.
|
||||||
verify(volume, times(1)).check(anyObject());
|
verify(volume, times(1)).check(anyObject());
|
||||||
if (result) {
|
if (result) {
|
||||||
|
Loading…
Reference in New Issue
Block a user