HDFS-15068. DataNode could meet deadlock if invoke refreshVolumes when register. Contributed by Aiphago.
Signed-off-by: Masatake Iwasaki <iwasakims@apache.org>
This commit is contained in:
parent
b19d87c2b7
commit
037ec8cfb1
@ -421,7 +421,7 @@ void registrationSucceeded(BPServiceActor bpServiceActor,
|
||||
reg.getStorageInfo().getClusterID(), "cluster ID");
|
||||
}
|
||||
bpRegistration = reg;
|
||||
|
||||
DataNodeFaultInjector.get().delayWhenOfferServiceHoldLock();
|
||||
dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
|
||||
// Add the initial block token secret keys to the DN's secret manager.
|
||||
if (dn.isBlockTokenEnabled) {
|
||||
|
@ -765,7 +765,13 @@ ChangedVolumes parseChangedVolumes(String newVolumes) throws IOException {
|
||||
* @throws IOException on error. If an IOException is thrown, some new volumes
|
||||
* may have been successfully added and removed.
|
||||
*/
|
||||
private synchronized void refreshVolumes(String newVolumes) throws IOException {
|
||||
private void refreshVolumes(String newVolumes) throws IOException {
|
||||
// Add volumes for each Namespace
|
||||
final List<NamespaceInfo> nsInfos = Lists.newArrayList();
|
||||
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
|
||||
nsInfos.add(bpos.getNamespaceInfo());
|
||||
}
|
||||
synchronized(this) {
|
||||
Configuration conf = getConf();
|
||||
conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
|
||||
ExecutorService service = null;
|
||||
@ -787,14 +793,10 @@ private synchronized void refreshVolumes(String newVolumes) throws IOException {
|
||||
LOG.info("Adding new volumes: {}",
|
||||
Joiner.on(",").join(changedVolumes.newLocations));
|
||||
|
||||
// Add volumes for each Namespace
|
||||
final List<NamespaceInfo> nsInfos = Lists.newArrayList();
|
||||
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
|
||||
nsInfos.add(bpos.getNamespaceInfo());
|
||||
}
|
||||
service = Executors
|
||||
.newFixedThreadPool(changedVolumes.newLocations.size());
|
||||
List<Future<IOException>> exceptions = Lists.newArrayList();
|
||||
|
||||
for (final StorageLocation location : changedVolumes.newLocations) {
|
||||
exceptions.add(service.submit(new Callable<IOException>() {
|
||||
@Override
|
||||
@ -851,6 +853,7 @@ public IOException call() {
|
||||
dataDirs = getStorageLocations(conf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove volumes from DataNode.
|
||||
|
@ -95,4 +95,9 @@ public void throwTooManyOpenFiles() throws FileNotFoundException {
|
||||
* process.
|
||||
*/
|
||||
public void stripedBlockReconstruction() throws IOException {}
|
||||
|
||||
/**
|
||||
* Used as a hook to inject intercept when BPOfferService hold lock.
|
||||
*/
|
||||
public void delayWhenOfferServiceHoldLock() {}
|
||||
}
|
||||
|
@ -36,6 +36,7 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
@ -72,6 +73,7 @@
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.AddBlockPoolException;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
@ -405,6 +407,50 @@ public void testVolumeFailureRecoveredByHotSwappingVolume()
|
||||
assertTrue(dn0.shouldRun());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test {@link DataNode#refreshVolumes(String)} not deadLock with
|
||||
* {@link BPOfferService#registrationSucceeded(BPServiceActor,
|
||||
* DatanodeRegistration)}.
|
||||
*/
|
||||
@Test(timeout=10000)
|
||||
public void testRefreshDeadLock() throws Exception {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
|
||||
public void delayWhenOfferServiceHoldLock() {
|
||||
try {
|
||||
latch.await();
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
File volume = cluster.getInstanceStorageDir(0, 0);
|
||||
String dataDirs = volume.getPath();
|
||||
List<BPOfferService> allBpOs = dn.getAllBpOs();
|
||||
BPOfferService service = allBpOs.get(0);
|
||||
BPServiceActor actor = service.getBPServiceActors().get(0);
|
||||
DatanodeRegistration bpRegistration = actor.getBpRegistration();
|
||||
|
||||
Thread register = new Thread(() -> {
|
||||
try {
|
||||
service.registrationSucceeded(actor, bpRegistration);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
|
||||
register.start();
|
||||
String newdir = dataDirs + "tmp";
|
||||
// Make sure service have get writelock
|
||||
latch.countDown();
|
||||
String result = dn.reconfigurePropertyImpl(
|
||||
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newdir);
|
||||
assertNotNull(result);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test changing the number of volumes does not impact the disk failure
|
||||
* tolerance.
|
||||
|
Loading…
Reference in New Issue
Block a user