HDFS-9445. Datanode may deadlock while handling a bad volume. Contributed by Walter Su.
This commit is contained in:
parent
e363417e7b
commit
a48301791e
@ -2574,6 +2574,9 @@ Release 2.7.2 - UNRELEASED
|
|||||||
HDFS-9294. DFSClient deadlock when close file and failed to renew lease.
|
HDFS-9294. DFSClient deadlock when close file and failed to renew lease.
|
||||||
(Brahma Reddy Battula via szetszwo)
|
(Brahma Reddy Battula via szetszwo)
|
||||||
|
|
||||||
|
HDFS-9445. Datanode may deadlock while handling a bad volume.
|
||||||
|
(Wlater Su via Kihwal)
|
||||||
|
|
||||||
Release 2.7.1 - 2015-07-06
|
Release 2.7.1 - 2015-07-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -470,48 +470,67 @@ public void addVolume(final StorageLocation location,
|
|||||||
* Removes a set of volumes from FsDataset.
|
* Removes a set of volumes from FsDataset.
|
||||||
* @param volumesToRemove a set of absolute root path of each volume.
|
* @param volumesToRemove a set of absolute root path of each volume.
|
||||||
* @param clearFailure set true to clear failure information.
|
* @param clearFailure set true to clear failure information.
|
||||||
*
|
|
||||||
* DataNode should call this function before calling
|
|
||||||
* {@link DataStorage#removeVolumes(java.util.Collection)}.
|
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public synchronized void removeVolumes(
|
public void removeVolumes(Set<File> volumesToRemove, boolean clearFailure) {
|
||||||
Set<File> volumesToRemove, boolean clearFailure) {
|
|
||||||
// Make sure that all volumes are absolute path.
|
// Make sure that all volumes are absolute path.
|
||||||
for (File vol : volumesToRemove) {
|
for (File vol : volumesToRemove) {
|
||||||
Preconditions.checkArgument(vol.isAbsolute(),
|
Preconditions.checkArgument(vol.isAbsolute(),
|
||||||
String.format("%s is not absolute path.", vol.getPath()));
|
String.format("%s is not absolute path.", vol.getPath()));
|
||||||
}
|
}
|
||||||
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
|
|
||||||
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
|
|
||||||
final File absRoot = sd.getRoot().getAbsoluteFile();
|
|
||||||
if (volumesToRemove.contains(absRoot)) {
|
|
||||||
LOG.info("Removing " + absRoot + " from FsDataset.");
|
|
||||||
|
|
||||||
// Disable the volume from the service.
|
Map<String, List<ReplicaInfo>> blkToInvalidate = new HashMap<>();
|
||||||
asyncDiskService.removeVolume(sd.getCurrentDir());
|
List<String> storageToRemove = new ArrayList<>();
|
||||||
volumes.removeVolume(absRoot, clearFailure);
|
synchronized (this) {
|
||||||
|
for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
|
||||||
|
Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
|
||||||
|
final File absRoot = sd.getRoot().getAbsoluteFile();
|
||||||
|
if (volumesToRemove.contains(absRoot)) {
|
||||||
|
LOG.info("Removing " + absRoot + " from FsDataset.");
|
||||||
|
|
||||||
// Removed all replica information for the blocks on the volume. Unlike
|
// Disable the volume from the service.
|
||||||
// updating the volumeMap in addVolume(), this operation does not scan
|
asyncDiskService.removeVolume(sd.getCurrentDir());
|
||||||
// disks.
|
volumes.removeVolume(absRoot, clearFailure);
|
||||||
for (String bpid : volumeMap.getBlockPoolList()) {
|
|
||||||
for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
|
// Removed all replica information for the blocks on the volume.
|
||||||
it.hasNext(); ) {
|
// Unlike updating the volumeMap in addVolume(), this operation does
|
||||||
ReplicaInfo block = it.next();
|
// not scan disks.
|
||||||
final File absBasePath =
|
for (String bpid : volumeMap.getBlockPoolList()) {
|
||||||
new File(block.getVolume().getBasePath()).getAbsoluteFile();
|
List<ReplicaInfo> blocks = new ArrayList<>();
|
||||||
if (absBasePath.equals(absRoot)) {
|
for (Iterator<ReplicaInfo> it = volumeMap.replicas(bpid).iterator();
|
||||||
invalidate(bpid, block);
|
it.hasNext(); ) {
|
||||||
it.remove();
|
ReplicaInfo block = it.next();
|
||||||
|
final File absBasePath =
|
||||||
|
new File(block.getVolume().getBasePath()).getAbsoluteFile();
|
||||||
|
if (absBasePath.equals(absRoot)) {
|
||||||
|
blocks.add(block);
|
||||||
|
it.remove();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
blkToInvalidate.put(bpid, blocks);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
storageMap.remove(sd.getStorageUuid());
|
storageToRemove.add(sd.getStorageUuid());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
setupAsyncLazyPersistThreads();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call this outside the lock.
|
||||||
|
for (Map.Entry<String, List<ReplicaInfo>> entry :
|
||||||
|
blkToInvalidate.entrySet()) {
|
||||||
|
String bpid = entry.getKey();
|
||||||
|
List<ReplicaInfo> blocks = entry.getValue();
|
||||||
|
for (ReplicaInfo block : blocks) {
|
||||||
|
invalidate(bpid, block);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized (this) {
|
||||||
|
for(String storageUuid : storageToRemove) {
|
||||||
|
storageMap.remove(storageUuid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
setupAsyncLazyPersistThreads();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private StorageType getStorageTypeFromLocations(
|
private StorageType getStorageTypeFromLocations(
|
||||||
@ -1931,15 +1950,11 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
|
|||||||
public void invalidate(String bpid, ReplicaInfo block) {
|
public void invalidate(String bpid, ReplicaInfo block) {
|
||||||
// If a DFSClient has the replica in its cache of short-circuit file
|
// If a DFSClient has the replica in its cache of short-circuit file
|
||||||
// descriptors (and the client is using ShortCircuitShm), invalidate it.
|
// descriptors (and the client is using ShortCircuitShm), invalidate it.
|
||||||
// The short-circuit registry is null in the unit tests, because the
|
datanode.getShortCircuitRegistry().processBlockInvalidation(
|
||||||
// datanode is mock object.
|
new ExtendedBlockId(block.getBlockId(), bpid));
|
||||||
if (datanode.getShortCircuitRegistry() != null) {
|
|
||||||
datanode.getShortCircuitRegistry().processBlockInvalidation(
|
|
||||||
new ExtendedBlockId(block.getBlockId(), bpid));
|
|
||||||
|
|
||||||
// If the block is cached, start uncaching it.
|
// If the block is cached, start uncaching it.
|
||||||
cacheManager.uncacheBlock(bpid, block.getBlockId());
|
cacheManager.uncacheBlock(bpid, block.getBlockId());
|
||||||
}
|
|
||||||
|
|
||||||
datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
|
datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, block),
|
||||||
block.getStorageUuid());
|
block.getStorageUuid());
|
||||||
|
@ -39,6 +39,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
|
||||||
@ -147,6 +148,9 @@ public void setUp() throws IOException {
|
|||||||
when(datanode.getDnConf()).thenReturn(dnConf);
|
when(datanode.getDnConf()).thenReturn(dnConf);
|
||||||
final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf);
|
final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf);
|
||||||
when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner);
|
when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner);
|
||||||
|
final ShortCircuitRegistry shortCircuitRegistry =
|
||||||
|
new ShortCircuitRegistry(conf);
|
||||||
|
when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry);
|
||||||
|
|
||||||
createStorageDirs(storage, conf, NUM_INIT_VOLUMES);
|
createStorageDirs(storage, conf, NUM_INIT_VOLUMES);
|
||||||
dataset = new FsDatasetImpl(datanode, storage, conf);
|
dataset = new FsDatasetImpl(datanode, storage, conf);
|
||||||
|
Loading…
Reference in New Issue
Block a user