HDFS-10830. FsDatasetImpl#removeVolumes crashes with IllegalMonitorStateException when vol being removed is in use. (Arpit Agarwal and Manoj Govindassamy)

This commit is contained in:
Arpit Agarwal 2016-09-10 18:22:25 -07:00
parent bee9f57f5c
commit a99bf26a08
4 changed files with 33 additions and 20 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.util; package org.apache.hadoop.util;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -135,4 +136,11 @@ boolean isLocked() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
/**
* See {@link ReentrantLock#newCondition()}.
* @return the Condition object
*/
public Condition newCondition() {
return lock.newCondition();
}
} }

View File

@ -40,6 +40,7 @@
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.management.NotCompliantMBeanException; import javax.management.NotCompliantMBeanException;
@ -269,6 +270,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
private final int maxDataLength; private final int maxDataLength;
private final AutoCloseableLock datasetLock; private final AutoCloseableLock datasetLock;
private final Condition datasetLockCondition;
/** /**
* An FSDataset has a directory where it loads its data files. * An FSDataset has a directory where it loads its data files.
@ -287,6 +289,8 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS),
300)); 300));
this.datasetLockCondition = datasetLock.newCondition();
// The number of volumes required for operation is the total number // The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate. // of volumes minus the number of failed volumes we can tolerate.
volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated(); volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();
@ -523,7 +527,7 @@ public void removeVolumes(Set<File> volumesToRemove, boolean clearFailure) {
// Disable the volume from the service. // Disable the volume from the service.
asyncDiskService.removeVolume(sd.getCurrentDir()); asyncDiskService.removeVolume(sd.getCurrentDir());
volumes.removeVolume(absRoot, clearFailure); volumes.removeVolume(absRoot, clearFailure);
volumes.waitVolumeRemoved(5000, this); volumes.waitVolumeRemoved(5000, datasetLockCondition);
// Removed all replica information for the blocks on the volume. // Removed all replica information for the blocks on the volume.
// Unlike updating the volumeMap in addVolume(), this operation does // Unlike updating the volumeMap in addVolume(), this operation does

View File

@ -31,6 +31,8 @@
import java.util.TreeMap; import java.util.TreeMap;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
@ -41,6 +43,7 @@
import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -52,7 +55,8 @@ class FsVolumeList {
Collections.synchronizedMap(new TreeMap<String, VolumeFailureInfo>()); Collections.synchronizedMap(new TreeMap<String, VolumeFailureInfo>());
private final ConcurrentLinkedQueue<FsVolumeImpl> volumesBeingRemoved = private final ConcurrentLinkedQueue<FsVolumeImpl> volumesBeingRemoved =
new ConcurrentLinkedQueue<>(); new ConcurrentLinkedQueue<>();
private Object checkDirsMutex = new Object(); private final AutoCloseableLock checkDirsLock;
private final Condition checkDirsLockCondition;
private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser; private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
private final BlockScanner blockScanner; private final BlockScanner blockScanner;
@ -62,6 +66,8 @@ class FsVolumeList {
VolumeChoosingPolicy<FsVolumeImpl> blockChooser) { VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
this.blockChooser = blockChooser; this.blockChooser = blockChooser;
this.blockScanner = blockScanner; this.blockScanner = blockScanner;
this.checkDirsLock = new AutoCloseableLock();
this.checkDirsLockCondition = checkDirsLock.newCondition();
for (VolumeFailureInfo volumeFailureInfo: initialVolumeFailureInfos) { for (VolumeFailureInfo volumeFailureInfo: initialVolumeFailureInfos) {
volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(), volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
volumeFailureInfo); volumeFailureInfo);
@ -224,12 +230,12 @@ public void run() {
/** /**
* Calls {@link FsVolumeImpl#checkDirs()} on each volume. * Calls {@link FsVolumeImpl#checkDirs()} on each volume.
* *
* Use checkDirsMutext to allow only one instance of checkDirs() call * Use {@link checkDirsLock} to allow only one instance of checkDirs() call.
* *
* @return list of all the failed volumes. * @return list of all the failed volumes.
*/ */
Set<File> checkDirs() { Set<File> checkDirs() {
synchronized(checkDirsMutex) { try (AutoCloseableLock lock = checkDirsLock.acquire()) {
Set<File> failedVols = null; Set<File> failedVols = null;
// Make a copy of volumes for performing modification // Make a copy of volumes for performing modification
@ -260,7 +266,7 @@ Set<File> checkDirs() {
+ " failure volumes."); + " failure volumes.");
} }
waitVolumeRemoved(5000, checkDirsMutex); waitVolumeRemoved(5000, checkDirsLockCondition);
return failedVols; return failedVols;
} }
} }
@ -271,13 +277,13 @@ Set<File> checkDirs() {
* *
* @param sleepMillis interval to recheck. * @param sleepMillis interval to recheck.
*/ */
void waitVolumeRemoved(int sleepMillis, Object monitor) { void waitVolumeRemoved(int sleepMillis, Condition condition) {
while (!checkVolumesRemoved()) { while (!checkVolumesRemoved()) {
if (FsDatasetImpl.LOG.isDebugEnabled()) { if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug("Waiting for volume reference to be released."); FsDatasetImpl.LOG.debug("Waiting for volume reference to be released.");
} }
try { try {
monitor.wait(sleepMillis); condition.await(sleepMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
FsDatasetImpl.LOG.info("Thread interrupted when waiting for " FsDatasetImpl.LOG.info("Thread interrupted when waiting for "
+ "volume reference to be released."); + "volume reference to be released.");

View File

@ -636,24 +636,19 @@ public void run() {
class VolRemoveThread extends Thread { class VolRemoveThread extends Thread {
public void run() { public void run() {
try {
Set<File> volumesToRemove = new HashSet<>(); Set<File> volumesToRemove = new HashSet<>();
try {
volumesToRemove.add(StorageLocation.parse( volumesToRemove.add(StorageLocation.parse(
dataset.getVolume(eb).getBasePath()).getFile()); dataset.getVolume(eb).getBasePath()).getFile());
/** } catch (Exception e) {
* TODO: {@link FsDatasetImpl#removeVolumes(Set, boolean)} is throwing LOG.info("Problem preparing volumes to remove: ", e);
* IllegalMonitorStateException when there is a parallel reader/writer Assert.fail("Exception in remove volume thread, check log for " +
* to the volume. Remove below exception handling block after fixing "details.");
* HDFS-10830. }
*/
LOG.info("Removing volume " + volumesToRemove); LOG.info("Removing volume " + volumesToRemove);
dataset.removeVolumes(volumesToRemove, true); dataset.removeVolumes(volumesToRemove, true);
volRemoveCompletedLatch.countDown(); volRemoveCompletedLatch.countDown();
LOG.info("Removed volume " + volumesToRemove); LOG.info("Removed volume " + volumesToRemove);
} catch (Exception e) {
LOG.info("Unexpected issue while removing volume: ", e);
volRemoveCompletedLatch.countDown();
}
} }
} }