From a99bf26a0899bcc4307c3a242c8414eaef555aa7 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Sat, 10 Sep 2016 18:22:25 -0700 Subject: [PATCH] HDFS-10830. FsDatasetImpl#removeVolumes crashes with IllegalMonitorStateException when vol being removed is in use. (Arpit Agarwal and Manoj Govindassamy) --- .../apache/hadoop/util/AutoCloseableLock.java | 8 +++++++ .../fsdataset/impl/FsDatasetImpl.java | 6 +++++- .../datanode/fsdataset/impl/FsVolumeList.java | 18 ++++++++++------ .../fsdataset/impl/TestFsDatasetImpl.java | 21 +++++++------------ 4 files changed, 33 insertions(+), 20 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java index d920bc63c1..d7fe93d73c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java @@ -18,6 +18,7 @@ package org.apache.hadoop.util; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import com.google.common.annotations.VisibleForTesting; @@ -135,4 +136,11 @@ boolean isLocked() { throw new UnsupportedOperationException(); } + /** + * See {@link ReentrantLock#newCondition()}. + * @return the Condition object + */ + public Condition newCondition() { + return lock.newCondition(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index e5da0e5b1c..e9f1dc13a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -40,6 +40,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.concurrent.locks.Condition; import java.util.concurrent.TimeUnit; import javax.management.NotCompliantMBeanException; @@ -269,6 +270,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) private final int maxDataLength; private final AutoCloseableLock datasetLock; + private final Condition datasetLockCondition; /** * An FSDataset has a directory where it loads its data files. @@ -287,6 +289,8 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS), 300)); + this.datasetLockCondition = datasetLock.newCondition(); + // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated(); @@ -523,7 +527,7 @@ public void removeVolumes(Set volumesToRemove, boolean clearFailure) { // Disable the volume from the service. asyncDiskService.removeVolume(sd.getCurrentDir()); volumes.removeVolume(absRoot, clearFailure); - volumes.waitVolumeRemoved(5000, this); + volumes.waitVolumeRemoved(5000, datasetLockCondition); // Removed all replica information for the blocks on the volume. // Unlike updating the volumeMap in addVolume(), this operation does diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index ea4d5975cd..634ad42d89 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -31,6 +31,8 @@ import java.util.TreeMap; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; @@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.server.datanode.BlockScanner; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.Time; @@ -52,7 +55,8 @@ class FsVolumeList { Collections.synchronizedMap(new TreeMap()); private final ConcurrentLinkedQueue volumesBeingRemoved = new ConcurrentLinkedQueue<>(); - private Object checkDirsMutex = new Object(); + private final AutoCloseableLock checkDirsLock; + private final Condition checkDirsLockCondition; private final VolumeChoosingPolicy blockChooser; private final BlockScanner blockScanner; @@ -62,6 +66,8 @@ class FsVolumeList { VolumeChoosingPolicy blockChooser) { this.blockChooser = blockChooser; this.blockScanner = blockScanner; + this.checkDirsLock = new AutoCloseableLock(); + this.checkDirsLockCondition = checkDirsLock.newCondition(); for (VolumeFailureInfo volumeFailureInfo: initialVolumeFailureInfos) { volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(), volumeFailureInfo); @@ -224,12 +230,12 @@ public void run() { /** * Calls {@link FsVolumeImpl#checkDirs()} on each volume. * - * Use checkDirsMutext to allow only one instance of checkDirs() call + * Use {@link checkDirsLock} to allow only one instance of checkDirs() call. * * @return list of all the failed volumes. */ Set checkDirs() { - synchronized(checkDirsMutex) { + try (AutoCloseableLock lock = checkDirsLock.acquire()) { Set failedVols = null; // Make a copy of volumes for performing modification @@ -260,7 +266,7 @@ Set checkDirs() { + " failure volumes."); } - waitVolumeRemoved(5000, checkDirsMutex); + waitVolumeRemoved(5000, checkDirsLockCondition); return failedVols; } } @@ -271,13 +277,13 @@ Set checkDirs() { * * @param sleepMillis interval to recheck. */ - void waitVolumeRemoved(int sleepMillis, Object monitor) { + void waitVolumeRemoved(int sleepMillis, Condition condition) { while (!checkVolumesRemoved()) { if (FsDatasetImpl.LOG.isDebugEnabled()) { FsDatasetImpl.LOG.debug("Waiting for volume reference to be released."); } try { - monitor.wait(sleepMillis); + condition.await(sleepMillis, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { FsDatasetImpl.LOG.info("Thread interrupted when waiting for " + "volume reference to be released."); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index b3f04d2436..a330fbfb27 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -636,24 +636,19 @@ public void run() { class VolRemoveThread extends Thread { public void run() { + Set volumesToRemove = new HashSet<>(); try { - Set volumesToRemove = new HashSet<>(); volumesToRemove.add(StorageLocation.parse( dataset.getVolume(eb).getBasePath()).getFile()); - /** - * TODO: {@link FsDatasetImpl#removeVolumes(Set, boolean)} is throwing - * IllegalMonitorStateException when there is a parallel reader/writer - * to the volume. Remove below exception handling block after fixing - * HDFS-10830. - */ - LOG.info("Removing volume " + volumesToRemove); - dataset.removeVolumes(volumesToRemove, true); - volRemoveCompletedLatch.countDown(); - LOG.info("Removed volume " + volumesToRemove); } catch (Exception e) { - LOG.info("Unexpected issue while removing volume: ", e); - volRemoveCompletedLatch.countDown(); + LOG.info("Problem preparing volumes to remove: ", e); + Assert.fail("Exception in remove volume thread, check log for " + + "details."); } + LOG.info("Removing volume " + volumesToRemove); + dataset.removeVolumes(volumesToRemove, true); + volRemoveCompletedLatch.countDown(); + LOG.info("Removed volume " + volumesToRemove); } }