From f678080dbd25a218e0406463a3c3a1fc03680702 Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Tue, 20 Dec 2016 13:53:07 -0800 Subject: [PATCH] HDFS-11182. Update DataNode to use DatasetVolumeChecker. Contributed by Arpit Agarwal. --- .../hadoop/hdfs/server/datanode/DataNode.java | 130 ++++++++---------- .../checker/DatasetVolumeChecker.java | 118 ++++++++++------ .../datanode/fsdataset/FsDatasetSpi.java | 3 +- .../fsdataset/impl/FsDatasetImpl.java | 5 +- .../datanode/fsdataset/impl/FsVolumeImpl.java | 7 - .../datanode/fsdataset/impl/FsVolumeList.java | 25 +--- .../blockmanagement/TestBlockStatsMXBean.java | 4 + .../server/datanode/SimulatedFSDataset.java | 18 ++- .../datanode/TestDataNodeHotSwapVolumes.java | 3 + .../datanode/TestDataNodeVolumeFailure.java | 3 + .../TestDataNodeVolumeFailureReporting.java | 3 + .../TestDataNodeVolumeFailureToleration.java | 3 + .../hdfs/server/datanode/TestDiskError.java | 24 ++-- .../checker/TestDatasetVolumeChecker.java | 17 ++- .../TestDatasetVolumeCheckerFailures.java | 45 +++--- .../extdataset/ExternalDatasetImpl.java | 3 +- .../fsdataset/impl/TestFsDatasetImpl.java | 84 ++--------- .../fsdataset/impl/TestFsVolumeList.java | 37 ----- 18 files changed, 234 insertions(+), 298 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 794b1ad1c1..a94c4b1608 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -74,6 +74,7 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -85,7 +86,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -108,6 +108,7 @@ import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker; import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.client.BlockReportOptions; @@ -369,11 +370,7 @@ public class DataNode extends ReconfigurableBase SaslDataTransferClient saslClient; SaslDataTransferServer saslServer; private ObjectName dataNodeInfoBeanName; - private Thread checkDiskErrorThread = null; - protected final int checkDiskErrorInterval; - private boolean checkDiskErrorFlag = false; - private Object checkDiskErrorMutex = new Object(); - private long lastDiskErrorCheck; + private volatile long lastDiskErrorCheck; private String supergroup; private boolean isPermissionEnabled; private String dnUserName = null; @@ -389,6 +386,7 @@ public class DataNode extends ReconfigurableBase @Nullable private final StorageLocationChecker storageLocationChecker; + private final DatasetVolumeChecker volumeChecker; private final SocketFactory socketFactory; @@ -407,7 +405,7 @@ public class DataNode extends ReconfigurableBase */ @VisibleForTesting @InterfaceAudience.LimitedPrivate("HDFS") - DataNode(final Configuration conf) { + DataNode(final Configuration conf) throws DiskErrorException { super(conf); this.tracer = createTracer(conf); this.tracerConfigurationManager = @@ -420,11 +418,10 @@ public class DataNode extends ReconfigurableBase this.connectToDnViaHostname = false; this.blockScanner = new BlockScanner(this, this.getConf()); this.pipelineSupportECN = false; - this.checkDiskErrorInterval = - ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25)); this.socketFactory = NetUtils.getDefaultSocketFactory(conf); initOOBTimeout(); storageLocationChecker = null; + volumeChecker = new DatasetVolumeChecker(conf, new Timer()); } /** @@ -464,8 +461,7 @@ public class DataNode extends ReconfigurableBase ",hdfs-" + conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED"); - this.checkDiskErrorInterval = - ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25)); + this.volumeChecker = new DatasetVolumeChecker(conf, new Timer()); // Determine whether we should try to pass file descriptors to clients. if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, @@ -1918,11 +1914,6 @@ public class DataNode extends ReconfigurableBase } } - // Interrupt the checkDiskErrorThread and terminate it. - if(this.checkDiskErrorThread != null) { - this.checkDiskErrorThread.interrupt(); - } - // Record the time of initial notification long timeNotified = Time.monotonicNow(); @@ -1944,6 +1935,8 @@ public class DataNode extends ReconfigurableBase } } + volumeChecker.shutdownAndWait(1, TimeUnit.SECONDS); + if (storageLocationChecker != null) { storageLocationChecker.shutdownAndWait(1, TimeUnit.SECONDS); } @@ -2051,16 +2044,19 @@ public class DataNode extends ReconfigurableBase * Check if there is a disk failure asynchronously and if so, handle the error */ public void checkDiskErrorAsync() { - synchronized(checkDiskErrorMutex) { - checkDiskErrorFlag = true; - if(checkDiskErrorThread == null) { - startCheckDiskErrorThread(); - checkDiskErrorThread.start(); - LOG.info("Starting CheckDiskError Thread"); - } - } + volumeChecker.checkAllVolumesAsync( + data, (healthyVolumes, failedVolumes) -> { + if (failedVolumes.size() > 0) { + LOG.warn("checkDiskErrorAsync callback got {} failed volumes: {}", + failedVolumes.size(), failedVolumes); + } else { + LOG.debug("checkDiskErrorAsync: no volume failures detected"); + } + lastDiskErrorCheck = Time.monotonicNow(); + handleVolumeFailures(failedVolumes); + }); } - + private void handleDiskError(String errMsgr) { final boolean hasEnoughResources = data.hasEnoughResource(); LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResources); @@ -3208,11 +3204,40 @@ public class DataNode extends ReconfigurableBase } /** - * Check the disk error + * Check the disk error synchronously. */ - private void checkDiskError() { - Set unhealthyLocations = data.checkDataDir(); - if (unhealthyLocations != null && !unhealthyLocations.isEmpty()) { + @VisibleForTesting + public void checkDiskError() throws IOException { + Set unhealthyVolumes; + try { + unhealthyVolumes = volumeChecker.checkAllVolumes(data); + lastDiskErrorCheck = Time.monotonicNow(); + } catch (InterruptedException e) { + LOG.error("Interruped while running disk check", e); + throw new IOException("Interrupted while running disk check", e); + } + + if (unhealthyVolumes.size() > 0) { + LOG.warn("checkDiskError got {} failed volumes - {}", + unhealthyVolumes.size(), unhealthyVolumes); + handleVolumeFailures(unhealthyVolumes); + } else { + LOG.debug("checkDiskError encountered no failures"); + } + } + + private void handleVolumeFailures(Set unhealthyVolumes) { + data.handleVolumeFailures(unhealthyVolumes); + Set unhealthyLocations = new HashSet<>( + unhealthyVolumes.size()); + + if (!unhealthyVolumes.isEmpty()) { + StringBuilder sb = new StringBuilder("DataNode failed volumes:"); + for (FsVolumeSpi vol : unhealthyVolumes) { + unhealthyLocations.add(vol.getStorageLocation()); + sb.append(vol.getStorageLocation()).append(";"); + } + try { // Remove all unhealthy volumes from DataNode. removeVolumes(unhealthyLocations, false); @@ -3220,56 +3245,13 @@ public class DataNode extends ReconfigurableBase LOG.warn("Error occurred when removing unhealthy storage dirs: " + e.getMessage(), e); } - StringBuilder sb = new StringBuilder("DataNode failed volumes:"); - for (StorageLocation location : unhealthyLocations) { - sb.append(location + ";"); - } + LOG.info(sb.toString()); handleDiskError(sb.toString()); } } - /** - * Starts a new thread which will check for disk error check request - * every 5 sec - */ - private void startCheckDiskErrorThread() { - checkDiskErrorThread = new Thread(new Runnable() { - @Override - public void run() { - while(shouldRun) { - boolean tempFlag ; - synchronized(checkDiskErrorMutex) { - tempFlag = checkDiskErrorFlag; - checkDiskErrorFlag = false; - } - if(tempFlag) { - try { - checkDiskError(); - } catch (Exception e) { - LOG.warn("Unexpected exception occurred while checking disk error " + e); - checkDiskErrorThread = null; - return; - } - synchronized(checkDiskErrorMutex) { - lastDiskErrorCheck = Time.monotonicNow(); - } - } - try { - Thread.sleep(checkDiskErrorInterval); - } catch (InterruptedException e) { - LOG.debug("InterruptedException in check disk error thread", e); - checkDiskErrorThread = null; - return; - } - } - } - }); - } - public long getLastDiskErrorCheck() { - synchronized(checkDiskErrorMutex) { - return lastDiskErrorCheck; - } + return lastDiskErrorCheck; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java index 8a57812f3a..ba09d23945 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java @@ -27,7 +27,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; @@ -161,37 +160,54 @@ public class DatasetVolumeChecker { * @param dataset - FsDatasetSpi to be checked. * @return set of failed volumes. */ - public Set checkAllVolumes( + public Set checkAllVolumes( final FsDatasetSpi dataset) throws InterruptedException { - - if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) { + final long gap = timer.monotonicNow() - lastAllVolumesCheck; + if (gap < minDiskCheckGapMs) { numSkippedChecks.incrementAndGet(); + LOG.trace( + "Skipped checking all volumes, time since last check {} is less " + + "than the minimum gap between checks ({} ms).", + gap, minDiskCheckGapMs); + return Collections.emptySet(); + } + + final FsDatasetSpi.FsVolumeReferences references = + dataset.getFsVolumeReferences(); + + if (references.size() == 0) { + LOG.warn("checkAllVolumesAsync - no volumes can be referenced"); return Collections.emptySet(); } lastAllVolumesCheck = timer.monotonicNow(); - final Set healthyVolumes = new HashSet<>(); - final Set failedVolumes = new HashSet<>(); - final Set allVolumes = new HashSet<>(); + final Set healthyVolumes = new HashSet<>(); + final Set failedVolumes = new HashSet<>(); + final Set allVolumes = new HashSet<>(); - final FsDatasetSpi.FsVolumeReferences references = - dataset.getFsVolumeReferences(); - final CountDownLatch resultsLatch = new CountDownLatch(references.size()); + final AtomicLong numVolumes = new AtomicLong(references.size()); + final CountDownLatch latch = new CountDownLatch(1); for (int i = 0; i < references.size(); ++i) { final FsVolumeReference reference = references.getReference(i); - allVolumes.add(reference.getVolume().getStorageLocation()); + allVolumes.add(reference.getVolume()); ListenableFuture future = delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT); LOG.info("Scheduled health check for volume {}", reference.getVolume()); Futures.addCallback(future, new ResultHandler( - reference, healthyVolumes, failedVolumes, resultsLatch, null)); + reference, healthyVolumes, failedVolumes, numVolumes, new Callback() { + @Override + public void call(Set ignored1, + Set ignored2) { + latch.countDown(); + } + })); } // Wait until our timeout elapses, after which we give up on // the remaining volumes. - if (!resultsLatch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) { + if (!latch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) { LOG.warn("checkAllVolumes timed out after {} ms" + maxAllowedTimeForCheckMs); } @@ -225,18 +241,28 @@ public class DatasetVolumeChecker { public boolean checkAllVolumesAsync( final FsDatasetSpi dataset, Callback callback) { - - if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) { + final long gap = timer.monotonicNow() - lastAllVolumesCheck; + if (gap < minDiskCheckGapMs) { numSkippedChecks.incrementAndGet(); + LOG.trace( + "Skipped checking all volumes, time since last check {} is less " + + "than the minimum gap between checks ({} ms).", + gap, minDiskCheckGapMs); + return false; + } + + final FsDatasetSpi.FsVolumeReferences references = + dataset.getFsVolumeReferences(); + + if (references.size() == 0) { + LOG.warn("checkAllVolumesAsync - no volumes can be referenced"); return false; } lastAllVolumesCheck = timer.monotonicNow(); - final Set healthyVolumes = new HashSet<>(); - final Set failedVolumes = new HashSet<>(); - final FsDatasetSpi.FsVolumeReferences references = - dataset.getFsVolumeReferences(); - final CountDownLatch latch = new CountDownLatch(references.size()); + final Set healthyVolumes = new HashSet<>(); + final Set failedVolumes = new HashSet<>(); + final AtomicLong numVolumes = new AtomicLong(references.size()); LOG.info("Checking {} volumes", references.size()); for (int i = 0; i < references.size(); ++i) { @@ -245,7 +271,7 @@ public class DatasetVolumeChecker { ListenableFuture future = delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT); Futures.addCallback(future, new ResultHandler( - reference, healthyVolumes, failedVolumes, latch, callback)); + reference, healthyVolumes, failedVolumes, numVolumes, callback)); } numAsyncDatasetChecks.incrementAndGet(); return true; @@ -260,8 +286,8 @@ public class DatasetVolumeChecker { * @param healthyVolumes set of volumes that passed disk checks. * @param failedVolumes set of volumes that failed disk checks. */ - void call(Set healthyVolumes, - Set failedVolumes); + void call(Set healthyVolumes, + Set failedVolumes); } /** @@ -273,8 +299,10 @@ public class DatasetVolumeChecker { * * @param volume the volume that is to be checked. * @param callback callback to be invoked when the volume check completes. + * @return true if the check was scheduled and the callback will be invoked. + * false otherwise. */ - public void checkVolume( + public boolean checkVolume( final FsVolumeSpi volume, Callback callback) { FsVolumeReference volumeReference; @@ -283,14 +311,15 @@ public class DatasetVolumeChecker { } catch (ClosedChannelException e) { // The volume has already been closed. callback.call(new HashSet<>(), new HashSet<>()); - return; + return false; } ListenableFuture future = delegateChecker.schedule(volume, IGNORED_CONTEXT); numVolumeChecks.incrementAndGet(); Futures.addCallback(future, new ResultHandler( volumeReference, new HashSet<>(), new HashSet<>(), - new CountDownLatch(1), callback)); + new AtomicLong(1), callback)); + return true; } /** @@ -299,26 +328,35 @@ public class DatasetVolumeChecker { private class ResultHandler implements FutureCallback { private final FsVolumeReference reference; - private final Set failedVolumes; - private final Set healthyVolumes; - private final CountDownLatch latch; - private final AtomicLong numVolumes; + private final Set failedVolumes; + private final Set healthyVolumes; + private final AtomicLong volumeCounter; @Nullable private final Callback callback; + /** + * + * @param reference FsVolumeReference to be released when the check is + * complete. + * @param healthyVolumes set of healthy volumes. If the disk check is + * successful, add the volume here. + * @param failedVolumes set of failed volumes. If the disk check fails, + * add the volume here. + * @param semaphore semaphore used to trigger callback invocation. + * @param callback invoked when the semaphore can be successfully acquired. + */ ResultHandler(FsVolumeReference reference, - Set healthyVolumes, - Set failedVolumes, - CountDownLatch latch, + Set healthyVolumes, + Set failedVolumes, + AtomicLong volumeCounter, @Nullable Callback callback) { Preconditions.checkState(reference != null); this.reference = reference; this.healthyVolumes = healthyVolumes; this.failedVolumes = failedVolumes; - this.latch = latch; + this.volumeCounter = volumeCounter; this.callback = callback; - numVolumes = new AtomicLong(latch.getCount()); } @Override @@ -355,13 +393,13 @@ public class DatasetVolumeChecker { private void markHealthy() { synchronized (DatasetVolumeChecker.this) { - healthyVolumes.add(reference.getVolume().getStorageLocation()); + healthyVolumes.add(reference.getVolume()); } } private void markFailed() { synchronized (DatasetVolumeChecker.this) { - failedVolumes.add(reference.getVolume().getStorageLocation()); + failedVolumes.add(reference.getVolume()); } } @@ -372,10 +410,8 @@ public class DatasetVolumeChecker { private void invokeCallback() { try { - latch.countDown(); - - if (numVolumes.decrementAndGet() == 0 && - callback != null) { + final long remaining = volumeCounter.decrementAndGet(); + if (callback != null && remaining == 0) { callback.call(healthyVolumes, failedVolumes); } } catch(Exception e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 30f045f2e1..9e979f7363 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -494,8 +494,9 @@ public interface FsDatasetSpi extends FSDatasetMBean { /** * Check if all the data directories are healthy * @return A set of unhealthy data directories. + * @param failedVolumes */ - Set checkDataDir(); + void handleVolumeFailures(Set failedVolumes); /** * Shutdown the FSDataset diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 35561cd839..0d5a12c183 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -2067,10 +2067,11 @@ class FsDatasetImpl implements FsDatasetSpi { * if some volumes failed - the caller must emove all the blocks that belong * to these failed volumes. * @return the failed volumes. Returns null if no volume failed. + * @param failedVolumes */ @Override // FsDatasetSpi - public Set checkDataDir() { - return volumes.checkDirs(); + public void handleVolumeFailures(Set failedVolumes) { + volumes.handleVolumeFailures(failedVolumes); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index e28ee27def..753c083751 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -959,13 +959,6 @@ public class FsVolumeImpl implements FsVolumeSpi { return cacheExecutor; } - void checkDirs() throws DiskErrorException { - // TODO:FEDERATION valid synchronization - for(BlockPoolSlice s : bpSlices.values()) { - s.checkDirs(); - } - } - @Override public VolumeCheckResult check(VolumeCheckContext ignored) throws DiskErrorException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index cf9c319fdc..64921d7fcd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -44,7 +43,6 @@ import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.AutoCloseableLock; -import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.Time; class FsVolumeList { @@ -235,23 +233,14 @@ class FsVolumeList { * Use {@link checkDirsLock} to allow only one instance of checkDirs() call. * * @return list of all the failed volumes. + * @param failedVolumes */ - Set checkDirs() { + void handleVolumeFailures(Set failedVolumes) { try (AutoCloseableLock lock = checkDirsLock.acquire()) { - Set failedLocations = null; - // Make a copy of volumes for performing modification - final List volumeList = getVolumes(); - for(Iterator i = volumeList.iterator(); i.hasNext(); ) { - final FsVolumeImpl fsv = i.next(); + for(FsVolumeSpi vol : failedVolumes) { + FsVolumeImpl fsv = (FsVolumeImpl) vol; try (FsVolumeReference ref = fsv.obtainReference()) { - fsv.checkDirs(); - } catch (DiskErrorException e) { - FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e); - if (failedLocations == null) { - failedLocations = new HashSet<>(1); - } - failedLocations.add(fsv.getStorageLocation()); addVolumeFailureInfo(fsv); removeVolume(fsv); } catch (ClosedChannelException e) { @@ -262,13 +251,7 @@ class FsVolumeList { } } - if (failedLocations != null && failedLocations.size() > 0) { - FsDatasetImpl.LOG.warn("Completed checkDirs. Found " + - failedLocations.size() + " failure volumes."); - } - waitVolumeRemoved(5000, checkDirsLockCondition); - return failedLocations; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java index 476565dc02..b7583c4e97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java @@ -30,9 +30,11 @@ import java.net.URL; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -52,6 +54,8 @@ public class TestBlockStatsMXBean { @Before public void setup() throws IOException { HdfsConfiguration conf = new HdfsConfiguration(); + conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, + 0, TimeUnit.MILLISECONDS); cluster = null; StorageType[][] types = new StorageType[6][]; for (int i=0; i<3; i++) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 484fbe4ce5..8472ecac56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -489,7 +489,17 @@ public class SimulatedFSDataset implements FsDatasetSpi { @Override public FsVolumeReference obtainReference() throws ClosedChannelException { - return null; + return new FsVolumeReference() { + @Override + public void close() throws IOException { + // no-op. + } + + @Override + public FsVolumeSpi getVolume() { + return SimulatedVolume.this; + } + }; } @Override @@ -1078,9 +1088,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { } @Override - public Set checkDataDir() { - // nothing to check for simulated data set - return null; + public void handleVolumeFailures(Set failedVolumes) { } @Override // FsDatasetSpi @@ -1349,7 +1357,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { @Override public FsVolumeReferences getFsVolumeReferences() { - throw new UnsupportedOperationException(); + return new FsVolumeReferences(Collections.singletonList(volume)); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index 5607ccc6df..e31e7833d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -62,6 +62,7 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; @@ -113,6 +114,8 @@ public class TestDataNodeHotSwapVolumes { 1000); /* Allow 1 volume failure */ conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); + conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, + 0, TimeUnit.MILLISECONDS); MiniDFSNNTopology nnTopology = MiniDFSNNTopology.simpleFederatedTopology(numNameNodes); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 8db76580b9..06e287144f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -33,6 +33,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; @@ -118,6 +119,8 @@ public class TestDataNodeVolumeFailure { // Allow a single volume failure (there are two volumes) conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30); + conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, + 0, TimeUnit.MILLISECONDS); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dn_num).build(); cluster.waitActive(); fs = cluster.getFileSystem(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java index aa9b7aa0e1..3d37b10bc6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java @@ -30,6 +30,7 @@ import java.io.File; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -659,6 +660,8 @@ public class TestDataNodeVolumeFailureReporting { conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000); conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, failedVolumesTolerated); + conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, + 0, TimeUnit.MILLISECONDS); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes) .storagesPerDatanode(storagesPerDatanode).build(); cluster.waitActive(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java index 5ff7d9b06b..de50ccb849 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.IOException; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -144,6 +145,8 @@ public class TestDataNodeVolumeFailureToleration { // Bring up two additional datanodes that need both of their volumes // functioning in order to stay up. conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 0); + conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, + 0, TimeUnit.MILLISECONDS); cluster.startDataNodes(conf, 2, true, null, null); cluster.waitActive(); final DatanodeManager dm = cluster.getNamesystem().getBlockManager( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java index 56dee43a3b..cd86720ef8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java @@ -26,7 +26,9 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.net.InetSocketAddress; import java.net.Socket; +import java.util.concurrent.TimeUnit; +import com.google.common.base.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -49,8 +51,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.DataChecksum; -import org.apache.hadoop.util.Time; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -69,6 +71,9 @@ public class TestDiskError { public void setUp() throws Exception { conf = new HdfsConfiguration(); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512L); + conf.setTimeDuration( + DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, + 0, TimeUnit.MILLISECONDS); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); fs = cluster.getFileSystem(); @@ -213,19 +218,22 @@ public class TestDiskError { * Before refactoring the code the above function was not getting called * @throws IOException, InterruptedException */ - @Test - public void testcheckDiskError() throws IOException, InterruptedException { + @Test(timeout=60000) + public void testcheckDiskError() throws Exception { if(cluster.getDataNodes().size() <= 0) { cluster.startDataNodes(conf, 1, true, null, null); cluster.waitActive(); } DataNode dataNode = cluster.getDataNodes().get(0); - long slackTime = dataNode.checkDiskErrorInterval/2; //checking for disk error - dataNode.checkDiskErrorAsync(); - Thread.sleep(dataNode.checkDiskErrorInterval); - long lastDiskErrorCheck = dataNode.getLastDiskErrorCheck(); - assertTrue("Disk Error check is not performed within " + dataNode.checkDiskErrorInterval + " ms", ((Time.monotonicNow()-lastDiskErrorCheck) < (dataNode.checkDiskErrorInterval + slackTime))); + final long lastCheckTimestamp = dataNode.getLastDiskErrorCheck(); + dataNode.checkDiskError(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return dataNode.getLastDiskErrorCheck() > lastCheckTimestamp; + } + }, 100, 60000); } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java index fa809d19ae..50096bae24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java @@ -35,7 +35,10 @@ import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -103,8 +106,8 @@ public class TestDatasetVolumeChecker { */ checker.checkVolume(volume, new DatasetVolumeChecker.Callback() { @Override - public void call(Set healthyVolumes, - Set failedVolumes) { + public void call(Set healthyVolumes, + Set failedVolumes) { numCallbackInvocations.incrementAndGet(); if (expectedVolumeHealth != null && expectedVolumeHealth != FAILED) { assertThat(healthyVolumes.size(), is(1)); @@ -138,7 +141,7 @@ public class TestDatasetVolumeChecker { new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer()); checker.setDelegateChecker(new DummyChecker()); - Set failedVolumes = checker.checkAllVolumes(dataset); + Set failedVolumes = checker.checkAllVolumes(dataset); LOG.info("Got back {} failed volumes", failedVolumes.size()); if (expectedVolumeHealth == null || expectedVolumeHealth == FAILED) { @@ -174,8 +177,8 @@ public class TestDatasetVolumeChecker { dataset, new DatasetVolumeChecker.Callback() { @Override public void call( - Set healthyVolumes, - Set failedVolumes) { + Set healthyVolumes, + Set failedVolumes) { LOG.info("Got back {} failed volumes", failedVolumes.size()); if (expectedVolumeHealth == null || expectedVolumeHealth == FAILED) { @@ -236,7 +239,7 @@ public class TestDatasetVolumeChecker { return dataset; } - private static List makeVolumes( + static List makeVolumes( int numVolumes, VolumeCheckResult health) throws Exception { final List volumes = new ArrayList<>(numVolumes); for (int i = 0; i < numVolumes; ++i) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java index b57d84f70b..16c333b4f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.fsdataset.*; import org.apache.hadoop.util.FakeTimer; +import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -47,6 +48,19 @@ public class TestDatasetVolumeCheckerFailures { public static final Logger LOG =LoggerFactory.getLogger( TestDatasetVolumeCheckerFailures.class); + private FakeTimer timer; + private Configuration conf; + + private static final long MIN_DISK_CHECK_GAP_MS = 1000; // 1 second. + + @Before + public void commonInit() { + timer = new FakeTimer(); + conf = new HdfsConfiguration(); + conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, + MIN_DISK_CHECK_GAP_MS, TimeUnit.MILLISECONDS); + } + /** * Test timeout in {@link DatasetVolumeChecker#checkAllVolumes}. * @throws Exception @@ -61,14 +75,13 @@ public class TestDatasetVolumeCheckerFailures { TestDatasetVolumeChecker.makeDataset(volumes); // Create a disk checker with a very low timeout. - final HdfsConfiguration conf = new HdfsConfiguration(); conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY, 1, TimeUnit.SECONDS); final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, new FakeTimer()); // Ensure that the hung volume is detected as failed. - Set failedVolumes = checker.checkAllVolumes(dataset); + Set failedVolumes = checker.checkAllVolumes(dataset); assertThat(failedVolumes.size(), is(1)); } @@ -86,10 +99,10 @@ public class TestDatasetVolumeCheckerFailures { final FsDatasetSpi dataset = TestDatasetVolumeChecker.makeDataset(volumes); - DatasetVolumeChecker checker = - new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer()); - Set failedVolumes = checker.checkAllVolumes(dataset); + DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer); + Set failedVolumes = checker.checkAllVolumes(dataset); assertThat(failedVolumes.size(), is(0)); + assertThat(checker.getNumSyncDatasetChecks(), is(0L)); // The closed volume should not have been checked as it cannot // be referenced. @@ -98,13 +111,10 @@ public class TestDatasetVolumeCheckerFailures { @Test(timeout=60000) public void testMinGapIsEnforcedForSyncChecks() throws Exception { + final List volumes = + TestDatasetVolumeChecker.makeVolumes(1, VolumeCheckResult.HEALTHY); final FsDatasetSpi dataset = - TestDatasetVolumeChecker.makeDataset(Collections.emptyList()); - final FakeTimer timer = new FakeTimer(); - final Configuration conf = new HdfsConfiguration(); - final long minGapMs = 100; - conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, - minGapMs, TimeUnit.MILLISECONDS); + TestDatasetVolumeChecker.makeDataset(volumes); final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer); checker.checkAllVolumes(dataset); @@ -116,7 +126,7 @@ public class TestDatasetVolumeCheckerFailures { assertThat(checker.getNumSkippedChecks(), is(1L)); // Re-check after advancing the timer. Ensure the check is performed. - timer.advance(minGapMs); + timer.advance(MIN_DISK_CHECK_GAP_MS); checker.checkAllVolumes(dataset); assertThat(checker.getNumSyncDatasetChecks(), is(2L)); assertThat(checker.getNumSkippedChecks(), is(1L)); @@ -124,13 +134,10 @@ public class TestDatasetVolumeCheckerFailures { @Test(timeout=60000) public void testMinGapIsEnforcedForASyncChecks() throws Exception { + final List volumes = + TestDatasetVolumeChecker.makeVolumes(1, VolumeCheckResult.HEALTHY); final FsDatasetSpi dataset = - TestDatasetVolumeChecker.makeDataset(Collections.emptyList()); - final FakeTimer timer = new FakeTimer(); - final Configuration conf = new HdfsConfiguration(); - final long minGapMs = 100; - conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, - minGapMs, TimeUnit.MILLISECONDS); + TestDatasetVolumeChecker.makeDataset(volumes); final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer); checker.checkAllVolumesAsync(dataset, null); @@ -142,7 +149,7 @@ public class TestDatasetVolumeCheckerFailures { assertThat(checker.getNumSkippedChecks(), is(1L)); // Re-check after advancing the timer. Ensure the check is performed. - timer.advance(minGapMs); + timer.advance(MIN_DISK_CHECK_GAP_MS); checker.checkAllVolumesAsync(dataset, null); assertThat(checker.getNumAsyncDatasetChecks(), is(2L)); assertThat(checker.getNumSkippedChecks(), is(1L)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 5cd86e29c4..62ef731e32 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -239,8 +239,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi { } @Override - public Set checkDataDir() { - return null; + public void handleVolumeFailures(Set failedVolumes) { } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index e48aae0a55..905c3f0c7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -52,13 +52,10 @@ import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.FakeTimer; import org.apache.hadoop.util.StringUtils; import org.junit.Assert; @@ -66,8 +63,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.io.File; import java.io.FileOutputStream; @@ -76,16 +71,18 @@ import java.io.OutputStreamWriter; import java.io.Writer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; @@ -94,13 +91,10 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import org.slf4j.Logger; @@ -339,68 +333,6 @@ public class TestFsDatasetImpl { assertEquals(numExistingVolumes, getNumVolumes()); } - @Test(timeout = 5000) - public void testChangeVolumeWithRunningCheckDirs() throws IOException { - RoundRobinVolumeChoosingPolicy blockChooser = - new RoundRobinVolumeChoosingPolicy<>(); - conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); - final BlockScanner blockScanner = new BlockScanner(datanode); - final FsVolumeList volumeList = new FsVolumeList( - Collections.emptyList(), blockScanner, blockChooser); - final List oldVolumes = new ArrayList<>(); - - // Initialize FsVolumeList with 5 mock volumes. - final int NUM_VOLUMES = 5; - for (int i = 0; i < NUM_VOLUMES; i++) { - FsVolumeImpl volume = mock(FsVolumeImpl.class); - oldVolumes.add(volume); - when(volume.getStorageLocation()).thenReturn( - StorageLocation.parse(new File("data" + i).toURI().toString())); - when(volume.checkClosed()).thenReturn(true); - FsVolumeReference ref = mock(FsVolumeReference.class); - when(ref.getVolume()).thenReturn(volume); - volumeList.addVolume(ref); - } - - // When call checkDirs() on the 2nd volume, anther "thread" removes the 5th - // volume and add another volume. It does not affect checkDirs() running. - final FsVolumeImpl newVolume = mock(FsVolumeImpl.class); - final FsVolumeReference newRef = mock(FsVolumeReference.class); - when(newRef.getVolume()).thenReturn(newVolume); - when(newVolume.getStorageLocation()).thenReturn( - StorageLocation.parse(new File("data4").toURI().toString())); - FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) - throws Throwable { - volumeList.removeVolume( - StorageLocation.parse((new File("data4")).toURI().toString()), - false); - volumeList.addVolume(newRef); - return null; - } - }).when(blockedVolume).checkDirs(); - - FsVolumeImpl brokenVolume = volumeList.getVolumes().get(2); - doThrow(new DiskChecker.DiskErrorException("broken")) - .when(brokenVolume).checkDirs(); - - volumeList.checkDirs(); - - // Since FsVolumeImpl#checkDirs() get a snapshot of the list of volumes - // before running removeVolume(), it is supposed to run checkDirs() on all - // the old volumes. - for (FsVolumeImpl volume : oldVolumes) { - verify(volume).checkDirs(); - } - // New volume is not visible to checkDirs() process. - verify(newVolume, never()).checkDirs(); - assertTrue(volumeList.getVolumes().contains(newVolume)); - assertFalse(volumeList.getVolumes().contains(brokenVolume)); - assertEquals(NUM_VOLUMES - 1, volumeList.getVolumes().size()); - } - @Test public void testAddVolumeFailureReleasesInUseLock() throws IOException { FsDatasetImpl spyDataset = spy(dataset); @@ -717,6 +649,9 @@ public class TestFsDatasetImpl { Configuration config = new HdfsConfiguration(); config.setLong( DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000); + config.setTimeDuration( + DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, 0, + TimeUnit.MILLISECONDS); config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); @@ -734,6 +669,8 @@ public class TestFsDatasetImpl { getVolume(block); File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem() .getBlockPoolId()); + LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0); + DatanodeInfo info = lb.getLocations()[0]; if (finalizedDir.exists()) { // Remove write and execute access so that checkDiskErrorThread detects @@ -744,15 +681,14 @@ public class TestFsDatasetImpl { Assert.assertTrue("Reference count for the volume should be greater " + "than 0", volume.getReferenceCount() > 0); // Invoke the synchronous checkDiskError method - dataNode.getFSDataset().checkDataDir(); + dataNode.checkDiskError(); // Sleep for 1 second so that datanode can interrupt and cluster clean up GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { return volume.getReferenceCount() == 0; } }, 100, 10); - LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0); - DatanodeInfo info = lb.getLocations()[0]; + assertThat(dataNode.getFSDataset().getNumFailedVolumes(), is(1)); try { out.close(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java index 6eff300feb..83c15caf63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java @@ -108,43 +108,6 @@ public class TestFsVolumeList { } } - @Test(timeout=30000) - public void testCheckDirsWithClosedVolume() throws IOException { - FsVolumeList volumeList = new FsVolumeList( - Collections.emptyList(), blockScanner, blockChooser); - final List volumes = new ArrayList<>(); - for (int i = 0; i < 3; i++) { - File curDir = new File(baseDir, "volume-" + i); - curDir.mkdirs(); - FsVolumeImpl volume = new FsVolumeImplBuilder() - .setConf(conf) - .setDataset(dataset) - .setStorageID("storage-id") - .setStorageDirectory( - new StorageDirectory(StorageLocation.parse(curDir.getPath()))) - .build(); - volumes.add(volume); - volumeList.addVolume(volume.obtainReference()); - } - - // Close the 2nd volume. - volumes.get(1).setClosed(); - try { - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - return volumes.get(1).checkClosed(); - } - }, 100, 3000); - } catch (TimeoutException e) { - fail("timed out while waiting for volume to be removed."); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - } - // checkDirs() should ignore the 2nd volume since it is closed. - volumeList.checkDirs(); - } - @Test(timeout=30000) public void testReleaseVolumeRefIfNoBlockScanner() throws IOException { FsVolumeList volumeList = new FsVolumeList(