HDDS-989. Check Hdds Volumes for errors. Contributed by Arpit Agarwal.

This commit is contained in:
Arpit Agarwal 2019-01-27 11:18:30 -08:00
parent 47d6b9bb7f
commit 3b49d7aeae
9 changed files with 2643 additions and 10 deletions

View File

@ -20,17 +20,23 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.sun.istack.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.GetSpaceUsed;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
import org.apache.hadoop.ozone.container.common.DataNodeLayoutVersion;
import org.apache.hadoop.ozone.container.common.helpers.DatanodeVersionFile;
import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.Time;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -58,7 +64,10 @@
* During DN startup, if the VERSION file exists, we verify that the
* clusterID in the version file matches the clusterID from SCM.
*/
public final class HddsVolume {
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class HddsVolume
implements Checkable<Boolean, VolumeCheckResult> {
private static final Logger LOG = LoggerFactory.getLogger(HddsVolume.class);
@ -76,6 +85,19 @@ public final class HddsVolume {
private long cTime; // creation time of the file system state
private int layoutVersion; // layout version of the storage data
/**
* Run a check on the current volume to determine if it is healthy.
* @param unused context for the check, ignored.
* @return result of checking the volume.
* @throws Exception if an exception was encountered while running
* the volume check.
*/
@Override
public VolumeCheckResult check(@Nullable Boolean unused) throws Exception {
DiskChecker.checkDir(hddsRootDir);
return VolumeCheckResult.HEALTHY;
}
/**
* Builder for HddsVolume.
*/

View File

@ -0,0 +1,418 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.volume;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.checker.AsyncChecker;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY;
/**
* A class that encapsulates running disk checks against each HDDS volume and
* allows retrieving a list of failed volumes.
*/
public class HddsVolumeChecker {
public static final Logger LOG =
LoggerFactory.getLogger(HddsVolumeChecker.class);
private AsyncChecker<Boolean, VolumeCheckResult> delegateChecker;
private final AtomicLong numVolumeChecks = new AtomicLong(0);
private final AtomicLong numAllVolumeChecks = new AtomicLong(0);
private final AtomicLong numSkippedChecks = new AtomicLong(0);
/**
* Max allowed time for a disk check in milliseconds. If the check
* doesn't complete within this time we declare the disk as dead.
*/
private final long maxAllowedTimeForCheckMs;
/**
* Minimum time between two successive disk checks of a volume.
*/
private final long minDiskCheckGapMs;
/**
* Timestamp of the last check of all volumes.
*/
private long lastAllVolumesCheck;
private final Timer timer;
private final ExecutorService checkVolumeResultHandlerExecutorService;
/**
* @param conf Configuration object.
* @param timer {@link Timer} object used for throttling checks.
*/
public HddsVolumeChecker(Configuration conf, Timer timer)
throws DiskErrorException {
maxAllowedTimeForCheckMs = conf.getTimeDuration(
DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
if (maxAllowedTimeForCheckMs <= 0) {
throw new DiskErrorException("Invalid value configured for "
+ DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
+ maxAllowedTimeForCheckMs + " (should be > 0)");
}
this.timer = timer;
/**
* Maximum number of volume failures that can be tolerated without
* declaring a fatal error.
*/
int maxVolumeFailuresTolerated = conf.getInt(
DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
minDiskCheckGapMs = conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT,
TimeUnit.MILLISECONDS);
if (minDiskCheckGapMs < 0) {
throw new DiskErrorException("Invalid value configured for "
+ DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY + " - "
+ minDiskCheckGapMs + " (should be >= 0)");
}
long diskCheckTimeout = conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
if (diskCheckTimeout < 0) {
throw new DiskErrorException("Invalid value configured for "
+ DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
+ diskCheckTimeout + " (should be >= 0)");
}
lastAllVolumesCheck = timer.monotonicNow() - minDiskCheckGapMs;
if (maxVolumeFailuresTolerated < DataNode.MAX_VOLUME_FAILURE_TOLERATED_LIMIT) {
throw new DiskErrorException("Invalid value configured for "
+ DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY + " - "
+ maxVolumeFailuresTolerated + " "
+ DataNode.MAX_VOLUME_FAILURES_TOLERATED_MSG);
}
delegateChecker = new ThrottledAsyncChecker<>(
timer, minDiskCheckGapMs, diskCheckTimeout,
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("DataNode DiskChecker thread %d")
.setDaemon(true)
.build()));
checkVolumeResultHandlerExecutorService = Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("VolumeCheck ResultHandler thread %d")
.setDaemon(true)
.build());
}
/**
* Run checks against all HDDS volumes.
*
* This check may be performed at service startup and subsequently at
* regular intervals to detect and handle failed volumes.
*
* @param volumes - Set of volumes to be checked. This set must be immutable
* for the duration of the check else the results will be
* unexpected.
*
* @return set of failed volumes.
*/
public Set<HddsVolume> checkAllVolumes(Collection<HddsVolume> volumes)
throws InterruptedException {
final long gap = timer.monotonicNow() - lastAllVolumesCheck;
if (gap < minDiskCheckGapMs) {
numSkippedChecks.incrementAndGet();
LOG.trace(
"Skipped checking all volumes, time since last check {} is less " +
"than the minimum gap between checks ({} ms).",
gap, minDiskCheckGapMs);
return Collections.emptySet();
}
lastAllVolumesCheck = timer.monotonicNow();
final Set<HddsVolume> healthyVolumes = new HashSet<>();
final Set<HddsVolume> failedVolumes = new HashSet<>();
final Set<HddsVolume> allVolumes = new HashSet<>();
final AtomicLong numVolumes = new AtomicLong(volumes.size());
final CountDownLatch latch = new CountDownLatch(1);
for (HddsVolume v : volumes) {
Optional<ListenableFuture<VolumeCheckResult>> olf =
delegateChecker.schedule(v, null);
LOG.info("Scheduled health check for volume {}", v);
if (olf.isPresent()) {
allVolumes.add(v);
Futures.addCallback(olf.get(),
new ResultHandler(v, healthyVolumes, failedVolumes,
numVolumes, (ignored1, ignored2) -> latch.countDown()));
} else {
if (numVolumes.decrementAndGet() == 0) {
latch.countDown();
}
}
}
// Wait until our timeout elapses, after which we give up on
// the remaining volumes.
if (!latch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) {
LOG.warn("checkAllVolumes timed out after {} ms" +
maxAllowedTimeForCheckMs);
}
numAllVolumeChecks.incrementAndGet();
synchronized (this) {
// All volumes that have not been detected as healthy should be
// considered failed. This is a superset of 'failedVolumes'.
//
// Make a copy under the mutex as Sets.difference() returns a view
// of a potentially changing set.
return new HashSet<>(Sets.difference(allVolumes, healthyVolumes));
}
}
/**
* A callback interface that is supplied the result of running an
* async disk check on multiple volumes.
*/
public interface Callback {
/**
* @param healthyVolumes set of volumes that passed disk checks.
* @param failedVolumes set of volumes that failed disk checks.
*/
void call(Set<HddsVolume> healthyVolumes,
Set<HddsVolume> failedVolumes);
}
/**
* Check a single volume asynchronously, returning a {@link ListenableFuture}
* that can be used to retrieve the final result.
*
* If the volume cannot be referenced then it is already closed and
* cannot be checked. No error is propagated to the callback.
*
* @param volume the volume that is to be checked.
* @param callback callback to be invoked when the volume check completes.
* @return true if the check was scheduled and the callback will be invoked.
* false otherwise.
*/
public boolean checkVolume(final HddsVolume volume, Callback callback) {
if (volume == null) {
LOG.debug("Cannot schedule check on null volume");
return false;
}
Optional<ListenableFuture<VolumeCheckResult>> olf =
delegateChecker.schedule(volume, null);
if (olf.isPresent()) {
numVolumeChecks.incrementAndGet();
Futures.addCallback(olf.get(),
new ResultHandler(volume, new HashSet<>(), new HashSet<>(),
new AtomicLong(1), callback),
checkVolumeResultHandlerExecutorService
);
return true;
}
return false;
}
/**
* A callback to process the results of checking a volume.
*/
private class ResultHandler
implements FutureCallback<VolumeCheckResult> {
private final HddsVolume volume;
private final Set<HddsVolume> failedVolumes;
private final Set<HddsVolume> healthyVolumes;
private final AtomicLong volumeCounter;
@Nullable
private final Callback callback;
/**
*
* @param healthyVolumes set of healthy volumes. If the disk check is
* successful, add the volume here.
* @param failedVolumes set of failed volumes. If the disk check fails,
* add the volume here.
* @param volumeCounter volumeCounter used to trigger callback invocation.
* @param callback invoked when the volumeCounter reaches 0.
*/
ResultHandler(HddsVolume volume,
Set<HddsVolume> healthyVolumes,
Set<HddsVolume> failedVolumes,
AtomicLong volumeCounter,
@Nullable Callback callback) {
this.volume = volume;
this.healthyVolumes = healthyVolumes;
this.failedVolumes = failedVolumes;
this.volumeCounter = volumeCounter;
this.callback = callback;
}
@Override
public void onSuccess(@Nonnull VolumeCheckResult result) {
switch(result) {
case HEALTHY:
case DEGRADED:
LOG.debug("Volume {} is {}.", volume, result);
markHealthy();
break;
case FAILED:
LOG.warn("Volume {} detected as being unhealthy", volume);
markFailed();
break;
default:
LOG.error("Unexpected health check result {} for volume {}",
result, volume);
markHealthy();
break;
}
cleanup();
}
@Override
public void onFailure(@Nonnull Throwable t) {
Throwable exception = (t instanceof ExecutionException) ?
t.getCause() : t;
LOG.warn("Exception running disk checks against volume " +
volume, exception);
markFailed();
cleanup();
}
private void markHealthy() {
synchronized (HddsVolumeChecker.this) {
healthyVolumes.add(volume);
}
}
private void markFailed() {
synchronized (HddsVolumeChecker.this) {
failedVolumes.add(volume);
}
}
private void cleanup() {
invokeCallback();
}
private void invokeCallback() {
try {
final long remaining = volumeCounter.decrementAndGet();
if (callback != null && remaining == 0) {
callback.call(healthyVolumes, failedVolumes);
}
} catch(Exception e) {
// Propagating this exception is unlikely to be helpful.
LOG.warn("Unexpected exception", e);
}
}
}
/**
* Shutdown the checker and its associated ExecutorService.
*
* See {@link ExecutorService#awaitTermination} for the interpretation
* of the parameters.
*/
void shutdownAndWait(int gracePeriod, TimeUnit timeUnit) {
try {
delegateChecker.shutdownAndWait(gracePeriod, timeUnit);
} catch (InterruptedException e) {
LOG.warn("{} interrupted during shutdown.", this.getClass().getSimpleName());
Thread.currentThread().interrupt();
}
}
/**
* This method is for testing only.
*
* @param testDelegate
*/
@VisibleForTesting
void setDelegateChecker(
AsyncChecker<Boolean, VolumeCheckResult> testDelegate) {
delegateChecker = testDelegate;
}
/**
* Return the number of {@link #checkVolume} invocations.
*/
public long getNumVolumeChecks() {
return numVolumeChecks.get();
}
/**
* Return the number of {@link #checkAllVolumes} invocations.
*/
public long getNumAllVolumeChecks() {
return numAllVolumeChecks.get();
}
/**
* Return the number of checks skipped because the minimum gap since the
* last check had not elapsed.
*/
public long getNumSkippedChecks() {
return numSkippedChecks.get();
}
}

View File

@ -0,0 +1,245 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.volume;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.datanode.checker.AsyncChecker;
import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* An implementation of {@link AsyncChecker} that skips checking recently
* checked objects. It will enforce at least minMsBetweenChecks
* milliseconds between two successive checks of any one object.
*
* It is assumed that the total number of Checkable objects in the system
* is small, (not more than a few dozen) since the checker uses O(Checkables)
* storage and also potentially O(Checkables) threads.
*
* minMsBetweenChecks should be configured reasonably
* by the caller to avoid spinning up too many threads frequently.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
public static final Logger LOG =
LoggerFactory.getLogger(ThrottledAsyncChecker.class);
private final Timer timer;
/**
* The ExecutorService used to schedule asynchronous checks.
*/
private final ListeningExecutorService executorService;
private final ScheduledExecutorService scheduledExecutorService;
/**
* The minimum gap in milliseconds between two successive checks
* of the same object. This is the throttle.
*/
private final long minMsBetweenChecks;
private final long diskCheckTimeout;
/**
* Map of checks that are currently in progress. Protected by the object
* lock.
*/
private final Map<Checkable, ListenableFuture<V>> checksInProgress;
/**
* Maps Checkable objects to a future that can be used to retrieve
* the results of the operation.
* Protected by the object lock.
*/
private final Map<Checkable, ThrottledAsyncChecker.LastCheckResult<V>> completedChecks;
public ThrottledAsyncChecker(final Timer timer,
final long minMsBetweenChecks,
final long diskCheckTimeout,
final ExecutorService executorService) {
this.timer = timer;
this.minMsBetweenChecks = minMsBetweenChecks;
this.diskCheckTimeout = diskCheckTimeout;
this.executorService = MoreExecutors.listeningDecorator(executorService);
this.checksInProgress = new HashMap<>();
this.completedChecks = new WeakHashMap<>();
if (this.diskCheckTimeout > 0) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new
ScheduledThreadPoolExecutor(1);
this.scheduledExecutorService = MoreExecutors
.getExitingScheduledExecutorService(scheduledThreadPoolExecutor);
} else {
this.scheduledExecutorService = null;
}
}
/**
* See {@link AsyncChecker#schedule}
*
* If the object has been checked recently then the check will
* be skipped. Multiple concurrent checks for the same object
* will receive the same Future.
*/
@Override
public Optional<ListenableFuture<V>> schedule(
Checkable<K, V> target, K context) {
if (checksInProgress.containsKey(target)) {
return Optional.absent();
}
if (completedChecks.containsKey(target)) {
final ThrottledAsyncChecker.LastCheckResult<V> result = completedChecks.get(target);
final long msSinceLastCheck = timer.monotonicNow() - result.completedAt;
if (msSinceLastCheck < minMsBetweenChecks) {
LOG.debug("Skipped checking {}. Time since last check {}ms " +
"is less than the min gap {}ms.",
target, msSinceLastCheck, minMsBetweenChecks);
return Optional.absent();
}
}
LOG.info("Scheduling a check for {}", target);
final ListenableFuture<V> lfWithoutTimeout = executorService.submit(
() -> target.check(context));
final ListenableFuture<V> lf;
if (diskCheckTimeout > 0) {
lf = TimeoutFuture
.create(lfWithoutTimeout, diskCheckTimeout, TimeUnit.MILLISECONDS,
scheduledExecutorService);
} else {
lf = lfWithoutTimeout;
}
checksInProgress.put(target, lf);
addResultCachingCallback(target, lf);
return Optional.of(lf);
}
/**
* Register a callback to cache the result of a check.
* @param target
* @param lf
*/
private void addResultCachingCallback(
Checkable<K, V> target, ListenableFuture<V> lf) {
Futures.addCallback(lf, new FutureCallback<V>() {
@Override
public void onSuccess(@Nullable V result) {
synchronized (ThrottledAsyncChecker.this) {
checksInProgress.remove(target);
completedChecks.put(target, new LastCheckResult<>(
result, timer.monotonicNow()));
}
}
@Override
public void onFailure(@Nonnull Throwable t) {
synchronized (ThrottledAsyncChecker.this) {
checksInProgress.remove(target);
completedChecks.put(target, new LastCheckResult<>(
t, timer.monotonicNow()));
}
}
});
}
/**
* {@inheritDoc}.
*
* The results of in-progress checks are not useful during shutdown,
* so we optimize for faster shutdown by interrupt all actively
* executing checks.
*/
@Override
public void shutdownAndWait(long timeout, TimeUnit timeUnit)
throws InterruptedException {
if (scheduledExecutorService != null) {
scheduledExecutorService.shutdownNow();
scheduledExecutorService.awaitTermination(timeout, timeUnit);
}
executorService.shutdownNow();
executorService.awaitTermination(timeout, timeUnit);
}
/**
* Status of running a check. It can either be a result or an
* exception, depending on whether the check completed or threw.
*/
private static final class LastCheckResult<V> {
/**
* Timestamp at which the check completed.
*/
private final long completedAt;
/**
* Result of running the check if it completed. null if it threw.
*/
@Nullable
private final V result;
/**
* Exception thrown by the check. null if it returned a result.
*/
private final Throwable exception; // null on success.
/**
* Initialize with a result.
* @param result
*/
private LastCheckResult(V result, long completedAt) {
this.result = result;
this.exception = null;
this.completedAt = completedAt;
}
/**
* Initialize with an exception.
* @param completedAt
* @param t
*/
private LastCheckResult(Throwable t, long completedAt) {
this.result = null;
this.exception = t;
this.completedAt = completedAt;
}
}
}

View File

@ -0,0 +1,161 @@
/*
* Copyright (C) 2007 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
/**
* Some portions of this class have been modified to make it functional in this
* package.
*/
package org.apache.hadoop.ozone.container.common.volume;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Implementation of {@code Futures#withTimeout}.
* <p>
* <p>Future that delegates to another but will finish early (via a
* {@link TimeoutException} wrapped in an {@link ExecutionException}) if the
* specified duration expires. The delegate future is interrupted and
* cancelled if it times out.
*/
final class TimeoutFuture<V> extends AbstractFuture.TrustedFuture<V> {
public static final Logger LOG = LoggerFactory.getLogger(
TimeoutFuture.class);
static <V> ListenableFuture<V> create(
ListenableFuture<V> delegate,
long time,
TimeUnit unit,
ScheduledExecutorService scheduledExecutor) {
TimeoutFuture<V> result = new TimeoutFuture<V>(delegate);
TimeoutFuture.Fire<V> fire = new TimeoutFuture.Fire<V>(result);
result.timer = scheduledExecutor.schedule(fire, time, unit);
delegate.addListener(fire, directExecutor());
return result;
}
/*
* Memory visibility of these fields. There are two cases to consider.
*
* 1. visibility of the writes to these fields to Fire.run:
*
* The initial write to delegateRef is made definitely visible via the
* semantics of addListener/SES.schedule. The later racy write in cancel()
* is not guaranteed to be observed, however that is fine since the
* correctness is based on the atomic state in our base class. The initial
* write to timer is never definitely visible to Fire.run since it is
* assigned after SES.schedule is called. Therefore Fire.run has to check
* for null. However, it should be visible if Fire.run is called by
* delegate.addListener since addListener is called after the assignment
* to timer, and importantly this is the main situation in which we need to
* be able to see the write.
*
* 2. visibility of the writes to an afterDone() call triggered by cancel():
*
* Since these fields are non-final that means that TimeoutFuture is not
* being 'safely published', thus a motivated caller may be able to expose
* the reference to another thread that would then call cancel() and be
* unable to cancel the delegate. There are a number of ways to solve this,
* none of which are very pretty, and it is currently believed to be a
* purely theoretical problem (since the other actions should supply
* sufficient write-barriers).
*/
@Nullable private ListenableFuture<V> delegateRef;
@Nullable private Future<?> timer;
private TimeoutFuture(ListenableFuture<V> delegate) {
this.delegateRef = Preconditions.checkNotNull(delegate);
}
/**
* A runnable that is called when the delegate or the timer completes.
*/
private static final class Fire<V> implements Runnable {
@Nullable
TimeoutFuture<V> timeoutFutureRef;
Fire(
TimeoutFuture<V> timeoutFuture) {
this.timeoutFutureRef = timeoutFuture;
}
@Override
public void run() {
// If either of these reads return null then we must be after a
// successful cancel or another call to this method.
TimeoutFuture<V> timeoutFuture = timeoutFutureRef;
if (timeoutFuture == null) {
return;
}
ListenableFuture<V> delegate = timeoutFuture.delegateRef;
if (delegate == null) {
return;
}
/*
* If we're about to complete the TimeoutFuture, we want to release our
* reference to it. Otherwise, we'll pin it (and its result) in memory
* until the timeout task is GCed. (The need to clear our reference to
* the TimeoutFuture is the reason we use a *static* nested class with
* a manual reference back to the "containing" class.)
*
* This has the nice-ish side effect of limiting reentrancy: run() calls
* timeoutFuture.setException() calls run(). That reentrancy would
* already be harmless, since timeoutFuture can be set (and delegate
* cancelled) only once. (And "set only once" is important for other
* reasons: run() can still be invoked concurrently in different threads,
* even with the above null checks.)
*/
timeoutFutureRef = null;
if (delegate.isDone()) {
timeoutFuture.setFuture(delegate);
} else {
try {
timeoutFuture.setException(
new TimeoutException("Future timed out: " + delegate));
} finally {
delegate.cancel(true);
}
}
}
}
@Override
protected void afterDone() {
maybePropagateCancellation(delegateRef);
Future<?> localTimer = timer;
// Try to cancel the timer as an optimization.
// timer may be null if this call to run was by the timer task since there
// is no happens-before edge between the assignment to timer and an
// execution of the timer task.
if (localTimer != null) {
localTimer.cancel(false);
}
delegateRef = null;
timer = null;
}
}

View File

@ -21,6 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.curator.shaded.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
@ -37,8 +38,10 @@
import org.apache.hadoop.ozone.container.common.impl.StorageLocationReport;
import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume.VolumeState;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -48,11 +51,18 @@
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* VolumeSet to manage volumes in a DataNode.
* VolumeSet to manage HDDS volumes in a DataNode.
*/
public class VolumeSet {
@ -78,6 +88,14 @@ public class VolumeSet {
*/
private EnumMap<StorageType, List<HddsVolume>> volumeStateMap;
/**
* An executor for periodic disk checks.
*/
final ScheduledExecutorService diskCheckerservice;
final ScheduledFuture<?> periodicDiskChecker;
private static final long DISK_CHECK_INTERVAL_MINUTES = 15;
/**
* A Reentrant Read Write Lock to synchronize volume operations in VolumeSet.
* Any update to {@link VolumeSet#volumeMap},
@ -90,6 +108,7 @@ public class VolumeSet {
private String clusterID;
private Runnable shutdownHook;
private final HddsVolumeChecker volumeChecker;
public VolumeSet(String dnUuid, Configuration conf)
throws IOException {
@ -102,11 +121,30 @@ public VolumeSet(String dnUuid, String clusterID, Configuration conf)
this.clusterID = clusterID;
this.conf = conf;
this.volumeSetRWLock = new ReentrantReadWriteLock();
this.volumeChecker = getVolumeChecker(conf);
this.diskCheckerservice = Executors.newScheduledThreadPool(
1, r -> new Thread(r, "Periodic HDDS volume checker"));
this.periodicDiskChecker =
diskCheckerservice.scheduleWithFixedDelay(() -> {
try {
checkAllVolumes();
} catch (IOException e) {
LOG.warn("Exception while checking disks", e);
}
}, DISK_CHECK_INTERVAL_MINUTES, DISK_CHECK_INTERVAL_MINUTES,
TimeUnit.MINUTES);
initializeVolumeSet();
}
// Add DN volumes configured through ConfigKeys to volumeMap.
@VisibleForTesting
HddsVolumeChecker getVolumeChecker(Configuration conf)
throws DiskChecker.DiskErrorException {
return new HddsVolumeChecker(conf, new Timer());
}
/**
* Add DN volumes configured through ConfigKeys to volumeMap.
*/
private void initializeVolumeSet() throws IOException {
volumeMap = new ConcurrentHashMap<>();
failedVolumeMap = new ConcurrentHashMap<>();
@ -123,7 +161,7 @@ private void initializeVolumeSet() throws IOException {
}
for (StorageType storageType : StorageType.values()) {
volumeStateMap.put(storageType, new ArrayList<HddsVolume>());
volumeStateMap.put(storageType, new ArrayList<>());
}
for (String locationString : rawLocations) {
@ -139,6 +177,12 @@ private void initializeVolumeSet() throws IOException {
volumeStateMap.get(hddsVolume.getStorageType()).add(hddsVolume);
LOG.info("Added Volume : {} to VolumeSet",
hddsVolume.getHddsRootDir().getPath());
if (!hddsVolume.getHddsRootDir().mkdirs() &&
!hddsVolume.getHddsRootDir().exists()) {
throw new IOException("Failed to create HDDS storage dir " +
hddsVolume.getHddsRootDir());
}
} catch (IOException e) {
HddsVolume volume = new HddsVolume.Builder(locationString)
.failedVolume(true).build();
@ -147,8 +191,10 @@ private void initializeVolumeSet() throws IOException {
}
}
checkAllVolumes();
if (volumeMap.size() == 0) {
throw new DiskOutOfSpaceException("No storage location configured");
throw new DiskOutOfSpaceException("No storage locations configured");
}
// Ensure volume threads are stopped and scm df is saved during shutdown.
@ -159,6 +205,52 @@ private void initializeVolumeSet() throws IOException {
SHUTDOWN_HOOK_PRIORITY);
}
/**
* Run a synchronous parallel check of all HDDS volumes, removing
* failed volumes.
*/
private void checkAllVolumes() throws IOException {
List<HddsVolume> allVolumes = getVolumesList();
Set<HddsVolume> failedVolumes;
try {
failedVolumes = volumeChecker.checkAllVolumes(allVolumes);
} catch (InterruptedException e) {
throw new IOException("Interrupted while running disk check", e);
}
if (failedVolumes.size() > 0) {
LOG.warn("checkAllVolumes got {} failed volumes - {}",
failedVolumes.size(), failedVolumes);
handleVolumeFailures(failedVolumes);
} else {
LOG.debug("checkAllVolumes encountered no failures");
}
}
/**
* Handle one or more failed volumes.
* @param failedVolumes
*/
private void handleVolumeFailures(Set<HddsVolume> failedVolumes) {
for (HddsVolume v: failedVolumes) {
this.writeLock();
try {
// Immediately mark the volume as failed so it is unavailable
// for new containers.
volumeMap.remove(v.getHddsRootDir().getPath());
failedVolumeMap.putIfAbsent(v.getHddsRootDir().getPath(), v);
} finally {
this.writeUnlock();
}
// TODO:
// 1. Mark all closed containers on the volume as unhealthy.
// 2. Consider stopping IO on open containers and tearing down
// active pipelines.
// 3. Handle Ratis log disk failure.
}
}
/**
* If Version file exists and the {@link VolumeSet#clusterID} is not set yet,
* assign it the value from Version file. Otherwise, check that the given
@ -225,12 +317,12 @@ private HddsVolume createVolume(String locationString,
// Add a volume to VolumeSet
public boolean addVolume(String dataDir) {
boolean addVolume(String dataDir) {
return addVolume(dataDir, StorageType.DEFAULT);
}
// Add a volume to VolumeSet
public boolean addVolume(String volumeRoot, StorageType storageType) {
private boolean addVolume(String volumeRoot, StorageType storageType) {
String hddsRoot = HddsVolumeUtil.getHddsRoot(volumeRoot);
boolean success;
@ -330,16 +422,22 @@ private void saveVolumeSetUsed() {
}
/**
* Shutdown's the volumeset, if saveVolumeSetUsed is false, call's
* {@link VolumeSet#saveVolumeSetUsed}.
* Shutdown the volumeset.
*/
public void shutdown() {
saveVolumeSetUsed();
stopDiskChecker();
if (shutdownHook != null) {
ShutdownHookManager.get().removeShutdownHook(shutdownHook);
}
}
private void stopDiskChecker() {
periodicDiskChecker.cancel(true);
volumeChecker.shutdownAndWait(0, TimeUnit.SECONDS);
diskCheckerservice.shutdownNow();
}
@VisibleForTesting
public List<HddsVolume> getVolumesList() {
return ImmutableList.copyOf(volumeMap.values());

View File

@ -160,6 +160,7 @@ public void stop() {
writeChannel.stop();
readChannel.stop();
hddsDispatcher.shutdown();
volumeSet.shutdown();
}

View File

@ -0,0 +1,212 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.volume;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.checker.AsyncChecker;
import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.FakeTimer;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult.*;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.*;
/**
* Tests for {@link HddsVolumeChecker}.
*/
@RunWith(Parameterized.class)
public class TestHddsVolumeChecker {
public static final Logger LOG = LoggerFactory.getLogger(
TestHddsVolumeChecker.class);
@Rule
public TestName testName = new TestName();
@Rule
public Timeout globalTimeout = new Timeout(30_000);
/**
* Run each test case for each possible value of {@link VolumeCheckResult}.
* Including "null" for 'throw exception'.
* @return
*/
@Parameters(name="{0}")
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
for (VolumeCheckResult result : VolumeCheckResult.values()) {
values.add(new Object[] {result});
}
values.add(new Object[] {null});
return values;
}
/**
* When null, the check call should throw an exception.
*/
private final VolumeCheckResult expectedVolumeHealth;
private static final int NUM_VOLUMES = 2;
public TestHddsVolumeChecker(VolumeCheckResult expectedVolumeHealth) {
this.expectedVolumeHealth = expectedVolumeHealth;
}
/**
* Test {@link HddsVolumeChecker#checkVolume} propagates the
* check to the delegate checker.
*
* @throws Exception
*/
@Test
public void testCheckOneVolume() throws Exception {
LOG.info("Executing {}", testName.getMethodName());
final HddsVolume volume = makeVolumes(1, expectedVolumeHealth).get(0);
final HddsVolumeChecker checker =
new HddsVolumeChecker(new HdfsConfiguration(), new FakeTimer());
checker.setDelegateChecker(new DummyChecker());
final AtomicLong numCallbackInvocations = new AtomicLong(0);
/**
* Request a check and ensure it triggered {@link HddsVolume#check}.
*/
boolean result =
checker.checkVolume(volume, (healthyVolumes, failedVolumes) -> {
numCallbackInvocations.incrementAndGet();
if (expectedVolumeHealth != null &&
expectedVolumeHealth != FAILED) {
assertThat(healthyVolumes.size(), is(1));
assertThat(failedVolumes.size(), is(0));
} else {
assertThat(healthyVolumes.size(), is(0));
assertThat(failedVolumes.size(), is(1));
}
});
GenericTestUtils.waitFor(() -> numCallbackInvocations.get() > 0, 5, 10000);
// Ensure that the check was invoked at least once.
verify(volume, times(1)).check(anyObject());
if (result) {
assertThat(numCallbackInvocations.get(), is(1L));
}
}
/**
* Test {@link HddsVolumeChecker#checkAllVolumes} propagates
* checks for all volumes to the delegate checker.
*
* @throws Exception
*/
@Test
public void testCheckAllVolumes() throws Exception {
LOG.info("Executing {}", testName.getMethodName());
final List<HddsVolume> volumes = makeVolumes(
NUM_VOLUMES, expectedVolumeHealth);
final HddsVolumeChecker checker =
new HddsVolumeChecker(new HdfsConfiguration(), new FakeTimer());
checker.setDelegateChecker(new DummyChecker());
Set<HddsVolume> failedVolumes = checker.checkAllVolumes(volumes);
LOG.info("Got back {} failed volumes", failedVolumes.size());
if (expectedVolumeHealth == null || expectedVolumeHealth == FAILED) {
assertThat(failedVolumes.size(), is(NUM_VOLUMES));
} else {
assertTrue(failedVolumes.isEmpty());
}
// Ensure each volume's check() method was called exactly once.
for (HddsVolume volume : volumes) {
verify(volume, times(1)).check(anyObject());
}
}
/**
* A checker to wraps the result of {@link HddsVolume#check} in
* an ImmediateFuture.
*/
static class DummyChecker
implements AsyncChecker<Boolean, VolumeCheckResult> {
@Override
public Optional<ListenableFuture<VolumeCheckResult>> schedule(
Checkable<Boolean, VolumeCheckResult> target,
Boolean context) {
try {
LOG.info("Returning success for volume check");
return Optional.of(
Futures.immediateFuture(target.check(context)));
} catch (Exception e) {
LOG.info("check routine threw exception " + e);
return Optional.of(Futures.immediateFailedFuture(e));
}
}
@Override
public void shutdownAndWait(long timeout, TimeUnit timeUnit)
throws InterruptedException {
// Nothing to cancel.
}
}
static List<HddsVolume> makeVolumes(
int numVolumes, VolumeCheckResult health) throws Exception {
final List<HddsVolume> volumes = new ArrayList<>(numVolumes);
for (int i = 0; i < numVolumes; ++i) {
final HddsVolume volume = mock(HddsVolume.class);
if (health != null) {
when(volume.check(any(Boolean.class))).thenReturn(health);
when(volume.check(isNull())).thenReturn(health);
} else {
final DiskErrorException de = new DiskErrorException("Fake Exception");
when(volume.check(any(Boolean.class))).thenThrow(de);
when(volume.check(isNull())).thenThrow(de);
}
volumes.add(volume);
}
return volumes;
}
}

View File

@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.volume;
import com.google.common.collect.Iterables;
import org.apache.commons.io.FileUtils;
import org.apache.curator.shaded.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdfs.server.datanode.checker.AsyncChecker;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Timer;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.BindException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
* Verify that {@link VolumeSet} correctly checks for failed disks
* during initialization.
*/
public class TestVolumeSetDiskChecks {
public static final Logger LOG = LoggerFactory.getLogger(
TestVolumeSetDiskChecks.class);
@Rule
public Timeout globalTimeout = new Timeout(30_000);
@Rule
public ExpectedException thrown = ExpectedException.none();
Configuration conf = null;
/**
* Cleanup volume directories.
*/
@After
public void cleanup() {
final Collection<String> dirs = conf.getTrimmedStringCollection(
DFS_DATANODE_DATA_DIR_KEY);
for (String d: dirs) {
FileUtils.deleteQuietly(new File(d));
}
}
/**
* Verify that VolumeSet creates volume root directories at startup.
* @throws IOException
*/
@Test
public void testOzoneDirsAreCreated() throws IOException {
final int numVolumes = 2;
conf = getConfWithDataNodeDirs(numVolumes);
final VolumeSet volumeSet =
new VolumeSet(UUID.randomUUID().toString(), conf);
assertThat(volumeSet.getVolumesList().size(), is(numVolumes));
assertThat(volumeSet.getFailedVolumesList().size(), is(0));
// Verify that the Ozone dirs were created during initialization.
Collection<String> dirs = conf.getTrimmedStringCollection(
DFS_DATANODE_DATA_DIR_KEY);
for (String d : dirs) {
assertTrue(new File(d).isDirectory());
}
}
/**
* Verify that bad volumes are filtered at startup.
* @throws IOException
*/
@Test
public void testBadDirectoryDetection() throws IOException {
final int numVolumes = 5;
final int numBadVolumes = 2;
conf = getConfWithDataNodeDirs(numVolumes);
final VolumeSet volumeSet = new VolumeSet(
UUID.randomUUID().toString(), conf) {
@Override
HddsVolumeChecker getVolumeChecker(Configuration conf)
throws DiskErrorException {
return new DummyChecker(conf, new Timer(), numBadVolumes);
}
};
assertThat(volumeSet.getFailedVolumesList().size(), is(numBadVolumes));
assertThat(volumeSet.getVolumesList().size(), is(numVolumes - numBadVolumes));
}
/**
* Verify that initialization fails if all volumes are bad.
*/
@Test
public void testAllVolumesAreBad() throws IOException {
final int numVolumes = 5;
conf = getConfWithDataNodeDirs(numVolumes);
thrown.expect(IOException.class);
final VolumeSet volumeSet = new VolumeSet(
UUID.randomUUID().toString(), conf) {
@Override
HddsVolumeChecker getVolumeChecker(Configuration conf)
throws DiskErrorException {
return new DummyChecker(conf, new Timer(), numVolumes);
}
};
}
/**
* Update configuration with the specified number of Datanode
* storage directories.
* @param conf
* @param numDirs
*/
private Configuration getConfWithDataNodeDirs(int numDirs) {
final Configuration conf = new OzoneConfiguration();
final List<String> dirs = new ArrayList<>();
for (int i = 0; i < numDirs; ++i) {
dirs.add(GenericTestUtils.getRandomizedTestDir().getPath());
}
conf.set(DFS_DATANODE_DATA_DIR_KEY, String.join(",", dirs));
return conf;
}
/**
* A no-op checker that fails the given number of volumes and succeeds
* the rest.
*/
static class DummyChecker extends HddsVolumeChecker {
private final int numBadVolumes;
public DummyChecker(Configuration conf, Timer timer, int numBadVolumes)
throws DiskErrorException {
super(conf, timer);
this.numBadVolumes = numBadVolumes;
}
@Override
public Set<HddsVolume> checkAllVolumes(Collection<HddsVolume> volumes)
throws InterruptedException {
// Return the first 'numBadVolumes' as failed.
return ImmutableSet.copyOf(Iterables.limit(volumes, numBadVolumes));
}
}
}