HDFS-11182. Update DataNode to use DatasetVolumeChecker. Contributed by Arpit Agarwal.

This commit is contained in:
Xiaoyu Yao 2016-12-20 13:53:07 -08:00
parent 5daa8d8631
commit f678080dbd
18 changed files with 234 additions and 298 deletions

View File

@ -74,6 +74,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -85,7 +86,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -108,6 +108,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker; import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.client.BlockReportOptions;
@ -369,11 +370,7 @@ public class DataNode extends ReconfigurableBase
SaslDataTransferClient saslClient; SaslDataTransferClient saslClient;
SaslDataTransferServer saslServer; SaslDataTransferServer saslServer;
private ObjectName dataNodeInfoBeanName; private ObjectName dataNodeInfoBeanName;
private Thread checkDiskErrorThread = null; private volatile long lastDiskErrorCheck;
protected final int checkDiskErrorInterval;
private boolean checkDiskErrorFlag = false;
private Object checkDiskErrorMutex = new Object();
private long lastDiskErrorCheck;
private String supergroup; private String supergroup;
private boolean isPermissionEnabled; private boolean isPermissionEnabled;
private String dnUserName = null; private String dnUserName = null;
@ -389,6 +386,7 @@ public class DataNode extends ReconfigurableBase
@Nullable @Nullable
private final StorageLocationChecker storageLocationChecker; private final StorageLocationChecker storageLocationChecker;
private final DatasetVolumeChecker volumeChecker;
private final SocketFactory socketFactory; private final SocketFactory socketFactory;
@ -407,7 +405,7 @@ public class DataNode extends ReconfigurableBase
*/ */
@VisibleForTesting @VisibleForTesting
@InterfaceAudience.LimitedPrivate("HDFS") @InterfaceAudience.LimitedPrivate("HDFS")
DataNode(final Configuration conf) { DataNode(final Configuration conf) throws DiskErrorException {
super(conf); super(conf);
this.tracer = createTracer(conf); this.tracer = createTracer(conf);
this.tracerConfigurationManager = this.tracerConfigurationManager =
@ -420,11 +418,10 @@ public class DataNode extends ReconfigurableBase
this.connectToDnViaHostname = false; this.connectToDnViaHostname = false;
this.blockScanner = new BlockScanner(this, this.getConf()); this.blockScanner = new BlockScanner(this, this.getConf());
this.pipelineSupportECN = false; this.pipelineSupportECN = false;
this.checkDiskErrorInterval =
ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25));
this.socketFactory = NetUtils.getDefaultSocketFactory(conf); this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
initOOBTimeout(); initOOBTimeout();
storageLocationChecker = null; storageLocationChecker = null;
volumeChecker = new DatasetVolumeChecker(conf, new Timer());
} }
/** /**
@ -464,8 +461,7 @@ public class DataNode extends ReconfigurableBase
",hdfs-" + ",hdfs-" +
conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED"); conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");
this.checkDiskErrorInterval = this.volumeChecker = new DatasetVolumeChecker(conf, new Timer());
ThreadLocalRandom.current().nextInt(5000, (int) (5000 * 1.25));
// Determine whether we should try to pass file descriptors to clients. // Determine whether we should try to pass file descriptors to clients.
if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,
@ -1918,11 +1914,6 @@ public class DataNode extends ReconfigurableBase
} }
} }
// Interrupt the checkDiskErrorThread and terminate it.
if(this.checkDiskErrorThread != null) {
this.checkDiskErrorThread.interrupt();
}
// Record the time of initial notification // Record the time of initial notification
long timeNotified = Time.monotonicNow(); long timeNotified = Time.monotonicNow();
@ -1944,6 +1935,8 @@ public class DataNode extends ReconfigurableBase
} }
} }
volumeChecker.shutdownAndWait(1, TimeUnit.SECONDS);
if (storageLocationChecker != null) { if (storageLocationChecker != null) {
storageLocationChecker.shutdownAndWait(1, TimeUnit.SECONDS); storageLocationChecker.shutdownAndWait(1, TimeUnit.SECONDS);
} }
@ -2051,14 +2044,17 @@ public class DataNode extends ReconfigurableBase
* Check if there is a disk failure asynchronously and if so, handle the error * Check if there is a disk failure asynchronously and if so, handle the error
*/ */
public void checkDiskErrorAsync() { public void checkDiskErrorAsync() {
synchronized(checkDiskErrorMutex) { volumeChecker.checkAllVolumesAsync(
checkDiskErrorFlag = true; data, (healthyVolumes, failedVolumes) -> {
if(checkDiskErrorThread == null) { if (failedVolumes.size() > 0) {
startCheckDiskErrorThread(); LOG.warn("checkDiskErrorAsync callback got {} failed volumes: {}",
checkDiskErrorThread.start(); failedVolumes.size(), failedVolumes);
LOG.info("Starting CheckDiskError Thread"); } else {
} LOG.debug("checkDiskErrorAsync: no volume failures detected");
} }
lastDiskErrorCheck = Time.monotonicNow();
handleVolumeFailures(failedVolumes);
});
} }
private void handleDiskError(String errMsgr) { private void handleDiskError(String errMsgr) {
@ -3208,11 +3204,40 @@ public class DataNode extends ReconfigurableBase
} }
/** /**
* Check the disk error * Check the disk error synchronously.
*/ */
private void checkDiskError() { @VisibleForTesting
Set<StorageLocation> unhealthyLocations = data.checkDataDir(); public void checkDiskError() throws IOException {
if (unhealthyLocations != null && !unhealthyLocations.isEmpty()) { Set<FsVolumeSpi> unhealthyVolumes;
try {
unhealthyVolumes = volumeChecker.checkAllVolumes(data);
lastDiskErrorCheck = Time.monotonicNow();
} catch (InterruptedException e) {
LOG.error("Interruped while running disk check", e);
throw new IOException("Interrupted while running disk check", e);
}
if (unhealthyVolumes.size() > 0) {
LOG.warn("checkDiskError got {} failed volumes - {}",
unhealthyVolumes.size(), unhealthyVolumes);
handleVolumeFailures(unhealthyVolumes);
} else {
LOG.debug("checkDiskError encountered no failures");
}
}
private void handleVolumeFailures(Set<FsVolumeSpi> unhealthyVolumes) {
data.handleVolumeFailures(unhealthyVolumes);
Set<StorageLocation> unhealthyLocations = new HashSet<>(
unhealthyVolumes.size());
if (!unhealthyVolumes.isEmpty()) {
StringBuilder sb = new StringBuilder("DataNode failed volumes:");
for (FsVolumeSpi vol : unhealthyVolumes) {
unhealthyLocations.add(vol.getStorageLocation());
sb.append(vol.getStorageLocation()).append(";");
}
try { try {
// Remove all unhealthy volumes from DataNode. // Remove all unhealthy volumes from DataNode.
removeVolumes(unhealthyLocations, false); removeVolumes(unhealthyLocations, false);
@ -3220,57 +3245,14 @@ public class DataNode extends ReconfigurableBase
LOG.warn("Error occurred when removing unhealthy storage dirs: " LOG.warn("Error occurred when removing unhealthy storage dirs: "
+ e.getMessage(), e); + e.getMessage(), e);
} }
StringBuilder sb = new StringBuilder("DataNode failed volumes:"); LOG.info(sb.toString());
for (StorageLocation location : unhealthyLocations) {
sb.append(location + ";");
}
handleDiskError(sb.toString()); handleDiskError(sb.toString());
} }
} }
/**
* Starts a new thread which will check for disk error check request
* every 5 sec
*/
private void startCheckDiskErrorThread() {
checkDiskErrorThread = new Thread(new Runnable() {
@Override
public void run() {
while(shouldRun) {
boolean tempFlag ;
synchronized(checkDiskErrorMutex) {
tempFlag = checkDiskErrorFlag;
checkDiskErrorFlag = false;
}
if(tempFlag) {
try {
checkDiskError();
} catch (Exception e) {
LOG.warn("Unexpected exception occurred while checking disk error " + e);
checkDiskErrorThread = null;
return;
}
synchronized(checkDiskErrorMutex) {
lastDiskErrorCheck = Time.monotonicNow();
}
}
try {
Thread.sleep(checkDiskErrorInterval);
} catch (InterruptedException e) {
LOG.debug("InterruptedException in check disk error thread", e);
checkDiskErrorThread = null;
return;
}
}
}
});
}
public long getLastDiskErrorCheck() { public long getLastDiskErrorCheck() {
synchronized(checkDiskErrorMutex) {
return lastDiskErrorCheck; return lastDiskErrorCheck;
} }
}
@Override @Override
public SpanReceiverInfo[] listSpanReceivers() throws IOException { public SpanReceiverInfo[] listSpanReceivers() throws IOException {

View File

@ -27,7 +27,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@ -161,37 +160,54 @@ public class DatasetVolumeChecker {
* @param dataset - FsDatasetSpi to be checked. * @param dataset - FsDatasetSpi to be checked.
* @return set of failed volumes. * @return set of failed volumes.
*/ */
public Set<StorageLocation> checkAllVolumes( public Set<FsVolumeSpi> checkAllVolumes(
final FsDatasetSpi<? extends FsVolumeSpi> dataset) final FsDatasetSpi<? extends FsVolumeSpi> dataset)
throws InterruptedException { throws InterruptedException {
final long gap = timer.monotonicNow() - lastAllVolumesCheck;
if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) { if (gap < minDiskCheckGapMs) {
numSkippedChecks.incrementAndGet(); numSkippedChecks.incrementAndGet();
LOG.trace(
"Skipped checking all volumes, time since last check {} is less " +
"than the minimum gap between checks ({} ms).",
gap, minDiskCheckGapMs);
return Collections.emptySet();
}
final FsDatasetSpi.FsVolumeReferences references =
dataset.getFsVolumeReferences();
if (references.size() == 0) {
LOG.warn("checkAllVolumesAsync - no volumes can be referenced");
return Collections.emptySet(); return Collections.emptySet();
} }
lastAllVolumesCheck = timer.monotonicNow(); lastAllVolumesCheck = timer.monotonicNow();
final Set<StorageLocation> healthyVolumes = new HashSet<>(); final Set<FsVolumeSpi> healthyVolumes = new HashSet<>();
final Set<StorageLocation> failedVolumes = new HashSet<>(); final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
final Set<StorageLocation> allVolumes = new HashSet<>(); final Set<FsVolumeSpi> allVolumes = new HashSet<>();
final FsDatasetSpi.FsVolumeReferences references = final AtomicLong numVolumes = new AtomicLong(references.size());
dataset.getFsVolumeReferences(); final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch resultsLatch = new CountDownLatch(references.size());
for (int i = 0; i < references.size(); ++i) { for (int i = 0; i < references.size(); ++i) {
final FsVolumeReference reference = references.getReference(i); final FsVolumeReference reference = references.getReference(i);
allVolumes.add(reference.getVolume().getStorageLocation()); allVolumes.add(reference.getVolume());
ListenableFuture<VolumeCheckResult> future = ListenableFuture<VolumeCheckResult> future =
delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT); delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
LOG.info("Scheduled health check for volume {}", reference.getVolume()); LOG.info("Scheduled health check for volume {}", reference.getVolume());
Futures.addCallback(future, new ResultHandler( Futures.addCallback(future, new ResultHandler(
reference, healthyVolumes, failedVolumes, resultsLatch, null)); reference, healthyVolumes, failedVolumes, numVolumes, new Callback() {
@Override
public void call(Set<FsVolumeSpi> ignored1,
Set<FsVolumeSpi> ignored2) {
latch.countDown();
}
}));
} }
// Wait until our timeout elapses, after which we give up on // Wait until our timeout elapses, after which we give up on
// the remaining volumes. // the remaining volumes.
if (!resultsLatch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) { if (!latch.await(maxAllowedTimeForCheckMs, TimeUnit.MILLISECONDS)) {
LOG.warn("checkAllVolumes timed out after {} ms" + LOG.warn("checkAllVolumes timed out after {} ms" +
maxAllowedTimeForCheckMs); maxAllowedTimeForCheckMs);
} }
@ -225,18 +241,28 @@ public class DatasetVolumeChecker {
public boolean checkAllVolumesAsync( public boolean checkAllVolumesAsync(
final FsDatasetSpi<? extends FsVolumeSpi> dataset, final FsDatasetSpi<? extends FsVolumeSpi> dataset,
Callback callback) { Callback callback) {
final long gap = timer.monotonicNow() - lastAllVolumesCheck;
if (timer.monotonicNow() - lastAllVolumesCheck < minDiskCheckGapMs) { if (gap < minDiskCheckGapMs) {
numSkippedChecks.incrementAndGet(); numSkippedChecks.incrementAndGet();
LOG.trace(
"Skipped checking all volumes, time since last check {} is less " +
"than the minimum gap between checks ({} ms).",
gap, minDiskCheckGapMs);
return false;
}
final FsDatasetSpi.FsVolumeReferences references =
dataset.getFsVolumeReferences();
if (references.size() == 0) {
LOG.warn("checkAllVolumesAsync - no volumes can be referenced");
return false; return false;
} }
lastAllVolumesCheck = timer.monotonicNow(); lastAllVolumesCheck = timer.monotonicNow();
final Set<StorageLocation> healthyVolumes = new HashSet<>(); final Set<FsVolumeSpi> healthyVolumes = new HashSet<>();
final Set<StorageLocation> failedVolumes = new HashSet<>(); final Set<FsVolumeSpi> failedVolumes = new HashSet<>();
final FsDatasetSpi.FsVolumeReferences references = final AtomicLong numVolumes = new AtomicLong(references.size());
dataset.getFsVolumeReferences();
final CountDownLatch latch = new CountDownLatch(references.size());
LOG.info("Checking {} volumes", references.size()); LOG.info("Checking {} volumes", references.size());
for (int i = 0; i < references.size(); ++i) { for (int i = 0; i < references.size(); ++i) {
@ -245,7 +271,7 @@ public class DatasetVolumeChecker {
ListenableFuture<VolumeCheckResult> future = ListenableFuture<VolumeCheckResult> future =
delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT); delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT);
Futures.addCallback(future, new ResultHandler( Futures.addCallback(future, new ResultHandler(
reference, healthyVolumes, failedVolumes, latch, callback)); reference, healthyVolumes, failedVolumes, numVolumes, callback));
} }
numAsyncDatasetChecks.incrementAndGet(); numAsyncDatasetChecks.incrementAndGet();
return true; return true;
@ -260,8 +286,8 @@ public class DatasetVolumeChecker {
* @param healthyVolumes set of volumes that passed disk checks. * @param healthyVolumes set of volumes that passed disk checks.
* @param failedVolumes set of volumes that failed disk checks. * @param failedVolumes set of volumes that failed disk checks.
*/ */
void call(Set<StorageLocation> healthyVolumes, void call(Set<FsVolumeSpi> healthyVolumes,
Set<StorageLocation> failedVolumes); Set<FsVolumeSpi> failedVolumes);
} }
/** /**
@ -273,8 +299,10 @@ public class DatasetVolumeChecker {
* *
* @param volume the volume that is to be checked. * @param volume the volume that is to be checked.
* @param callback callback to be invoked when the volume check completes. * @param callback callback to be invoked when the volume check completes.
* @return true if the check was scheduled and the callback will be invoked.
* false otherwise.
*/ */
public void checkVolume( public boolean checkVolume(
final FsVolumeSpi volume, final FsVolumeSpi volume,
Callback callback) { Callback callback) {
FsVolumeReference volumeReference; FsVolumeReference volumeReference;
@ -283,14 +311,15 @@ public class DatasetVolumeChecker {
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
// The volume has already been closed. // The volume has already been closed.
callback.call(new HashSet<>(), new HashSet<>()); callback.call(new HashSet<>(), new HashSet<>());
return; return false;
} }
ListenableFuture<VolumeCheckResult> future = ListenableFuture<VolumeCheckResult> future =
delegateChecker.schedule(volume, IGNORED_CONTEXT); delegateChecker.schedule(volume, IGNORED_CONTEXT);
numVolumeChecks.incrementAndGet(); numVolumeChecks.incrementAndGet();
Futures.addCallback(future, new ResultHandler( Futures.addCallback(future, new ResultHandler(
volumeReference, new HashSet<>(), new HashSet<>(), volumeReference, new HashSet<>(), new HashSet<>(),
new CountDownLatch(1), callback)); new AtomicLong(1), callback));
return true;
} }
/** /**
@ -299,26 +328,35 @@ public class DatasetVolumeChecker {
private class ResultHandler private class ResultHandler
implements FutureCallback<VolumeCheckResult> { implements FutureCallback<VolumeCheckResult> {
private final FsVolumeReference reference; private final FsVolumeReference reference;
private final Set<StorageLocation> failedVolumes; private final Set<FsVolumeSpi> failedVolumes;
private final Set<StorageLocation> healthyVolumes; private final Set<FsVolumeSpi> healthyVolumes;
private final CountDownLatch latch; private final AtomicLong volumeCounter;
private final AtomicLong numVolumes;
@Nullable @Nullable
private final Callback callback; private final Callback callback;
/**
*
* @param reference FsVolumeReference to be released when the check is
* complete.
* @param healthyVolumes set of healthy volumes. If the disk check is
* successful, add the volume here.
* @param failedVolumes set of failed volumes. If the disk check fails,
* add the volume here.
* @param semaphore semaphore used to trigger callback invocation.
* @param callback invoked when the semaphore can be successfully acquired.
*/
ResultHandler(FsVolumeReference reference, ResultHandler(FsVolumeReference reference,
Set<StorageLocation> healthyVolumes, Set<FsVolumeSpi> healthyVolumes,
Set<StorageLocation> failedVolumes, Set<FsVolumeSpi> failedVolumes,
CountDownLatch latch, AtomicLong volumeCounter,
@Nullable Callback callback) { @Nullable Callback callback) {
Preconditions.checkState(reference != null); Preconditions.checkState(reference != null);
this.reference = reference; this.reference = reference;
this.healthyVolumes = healthyVolumes; this.healthyVolumes = healthyVolumes;
this.failedVolumes = failedVolumes; this.failedVolumes = failedVolumes;
this.latch = latch; this.volumeCounter = volumeCounter;
this.callback = callback; this.callback = callback;
numVolumes = new AtomicLong(latch.getCount());
} }
@Override @Override
@ -355,13 +393,13 @@ public class DatasetVolumeChecker {
private void markHealthy() { private void markHealthy() {
synchronized (DatasetVolumeChecker.this) { synchronized (DatasetVolumeChecker.this) {
healthyVolumes.add(reference.getVolume().getStorageLocation()); healthyVolumes.add(reference.getVolume());
} }
} }
private void markFailed() { private void markFailed() {
synchronized (DatasetVolumeChecker.this) { synchronized (DatasetVolumeChecker.this) {
failedVolumes.add(reference.getVolume().getStorageLocation()); failedVolumes.add(reference.getVolume());
} }
} }
@ -372,10 +410,8 @@ public class DatasetVolumeChecker {
private void invokeCallback() { private void invokeCallback() {
try { try {
latch.countDown(); final long remaining = volumeCounter.decrementAndGet();
if (callback != null && remaining == 0) {
if (numVolumes.decrementAndGet() == 0 &&
callback != null) {
callback.call(healthyVolumes, failedVolumes); callback.call(healthyVolumes, failedVolumes);
} }
} catch(Exception e) { } catch(Exception e) {

View File

@ -494,8 +494,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
/** /**
* Check if all the data directories are healthy * Check if all the data directories are healthy
* @return A set of unhealthy data directories. * @return A set of unhealthy data directories.
* @param failedVolumes
*/ */
Set<StorageLocation> checkDataDir(); void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes);
/** /**
* Shutdown the FSDataset * Shutdown the FSDataset

View File

@ -2067,10 +2067,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* if some volumes failed - the caller must emove all the blocks that belong * if some volumes failed - the caller must emove all the blocks that belong
* to these failed volumes. * to these failed volumes.
* @return the failed volumes. Returns null if no volume failed. * @return the failed volumes. Returns null if no volume failed.
* @param failedVolumes
*/ */
@Override // FsDatasetSpi @Override // FsDatasetSpi
public Set<StorageLocation> checkDataDir() { public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
return volumes.checkDirs(); volumes.handleVolumeFailures(failedVolumes);
} }

View File

@ -959,13 +959,6 @@ public class FsVolumeImpl implements FsVolumeSpi {
return cacheExecutor; return cacheExecutor;
} }
void checkDirs() throws DiskErrorException {
// TODO:FEDERATION valid synchronization
for(BlockPoolSlice s : bpSlices.values()) {
s.checkDirs();
}
}
@Override @Override
public VolumeCheckResult check(VolumeCheckContext ignored) public VolumeCheckResult check(VolumeCheckContext ignored)
throws DiskErrorException { throws DiskErrorException {

View File

@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -44,7 +43,6 @@ import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
class FsVolumeList { class FsVolumeList {
@ -235,23 +233,14 @@ class FsVolumeList {
* Use {@link checkDirsLock} to allow only one instance of checkDirs() call. * Use {@link checkDirsLock} to allow only one instance of checkDirs() call.
* *
* @return list of all the failed volumes. * @return list of all the failed volumes.
* @param failedVolumes
*/ */
Set<StorageLocation> checkDirs() { void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
try (AutoCloseableLock lock = checkDirsLock.acquire()) { try (AutoCloseableLock lock = checkDirsLock.acquire()) {
Set<StorageLocation> failedLocations = null;
// Make a copy of volumes for performing modification
final List<FsVolumeImpl> volumeList = getVolumes();
for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) { for(FsVolumeSpi vol : failedVolumes) {
final FsVolumeImpl fsv = i.next(); FsVolumeImpl fsv = (FsVolumeImpl) vol;
try (FsVolumeReference ref = fsv.obtainReference()) { try (FsVolumeReference ref = fsv.obtainReference()) {
fsv.checkDirs();
} catch (DiskErrorException e) {
FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ", e);
if (failedLocations == null) {
failedLocations = new HashSet<>(1);
}
failedLocations.add(fsv.getStorageLocation());
addVolumeFailureInfo(fsv); addVolumeFailureInfo(fsv);
removeVolume(fsv); removeVolume(fsv);
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
@ -262,13 +251,7 @@ class FsVolumeList {
} }
} }
if (failedLocations != null && failedLocations.size() > 0) {
FsDatasetImpl.LOG.warn("Completed checkDirs. Found " +
failedLocations.size() + " failure volumes.");
}
waitVolumeRemoved(5000, checkDirsLockCondition); waitVolumeRemoved(5000, checkDirsLockCondition);
return failedLocations;
} }
} }

View File

@ -30,9 +30,11 @@ import java.net.URL;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -52,6 +54,8 @@ public class TestBlockStatsMXBean {
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = new HdfsConfiguration();
conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
0, TimeUnit.MILLISECONDS);
cluster = null; cluster = null;
StorageType[][] types = new StorageType[6][]; StorageType[][] types = new StorageType[6][];
for (int i=0; i<3; i++) { for (int i=0; i<3; i++) {

View File

@ -489,7 +489,17 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override @Override
public FsVolumeReference obtainReference() throws ClosedChannelException { public FsVolumeReference obtainReference() throws ClosedChannelException {
return null; return new FsVolumeReference() {
@Override
public void close() throws IOException {
// no-op.
}
@Override
public FsVolumeSpi getVolume() {
return SimulatedVolume.this;
}
};
} }
@Override @Override
@ -1078,9 +1088,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} }
@Override @Override
public Set<StorageLocation> checkDataDir() { public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
// nothing to check for simulated data set
return null;
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
@ -1349,7 +1357,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override @Override
public FsVolumeReferences getFsVolumeReferences() { public FsVolumeReferences getFsVolumeReferences() {
throw new UnsupportedOperationException(); return new FsVolumeReferences(Collections.singletonList(volume));
} }
@Override @Override

View File

@ -62,6 +62,7 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -113,6 +114,8 @@ public class TestDataNodeHotSwapVolumes {
1000); 1000);
/* Allow 1 volume failure */ /* Allow 1 volume failure */
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
0, TimeUnit.MILLISECONDS);
MiniDFSNNTopology nnTopology = MiniDFSNNTopology nnTopology =
MiniDFSNNTopology.simpleFederatedTopology(numNameNodes); MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);

View File

@ -33,6 +33,7 @@ import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -118,6 +119,8 @@ public class TestDataNodeVolumeFailure {
// Allow a single volume failure (there are two volumes) // Allow a single volume failure (there are two volumes)
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 30);
conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
0, TimeUnit.MILLISECONDS);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dn_num).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(dn_num).build();
cluster.waitActive(); cluster.waitActive();
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();

View File

@ -30,6 +30,7 @@ import java.io.File;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -659,6 +660,8 @@ public class TestDataNodeVolumeFailureReporting {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000); conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1000);
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY,
failedVolumesTolerated); failedVolumesTolerated);
conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
0, TimeUnit.MILLISECONDS);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes) cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes)
.storagesPerDatanode(storagesPerDatanode).build(); .storagesPerDatanode(storagesPerDatanode).build();
cluster.waitActive(); cluster.waitActive();

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -144,6 +145,8 @@ public class TestDataNodeVolumeFailureToleration {
// Bring up two additional datanodes that need both of their volumes // Bring up two additional datanodes that need both of their volumes
// functioning in order to stay up. // functioning in order to stay up.
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 0); conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 0);
conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
0, TimeUnit.MILLISECONDS);
cluster.startDataNodes(conf, 2, true, null, null); cluster.startDataNodes(conf, 2, true, null, null);
cluster.waitActive(); cluster.waitActive();
final DatanodeManager dm = cluster.getNamesystem().getBlockManager( final DatanodeManager dm = cluster.getNamesystem().getBlockManager(

View File

@ -26,7 +26,9 @@ import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -49,8 +51,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -69,6 +71,9 @@ public class TestDiskError {
public void setUp() throws Exception { public void setUp() throws Exception {
conf = new HdfsConfiguration(); conf = new HdfsConfiguration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512L); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 512L);
conf.setTimeDuration(
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
0, TimeUnit.MILLISECONDS);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive(); cluster.waitActive();
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
@ -213,19 +218,22 @@ public class TestDiskError {
* Before refactoring the code the above function was not getting called * Before refactoring the code the above function was not getting called
* @throws IOException, InterruptedException * @throws IOException, InterruptedException
*/ */
@Test @Test(timeout=60000)
public void testcheckDiskError() throws IOException, InterruptedException { public void testcheckDiskError() throws Exception {
if(cluster.getDataNodes().size() <= 0) { if(cluster.getDataNodes().size() <= 0) {
cluster.startDataNodes(conf, 1, true, null, null); cluster.startDataNodes(conf, 1, true, null, null);
cluster.waitActive(); cluster.waitActive();
} }
DataNode dataNode = cluster.getDataNodes().get(0); DataNode dataNode = cluster.getDataNodes().get(0);
long slackTime = dataNode.checkDiskErrorInterval/2;
//checking for disk error //checking for disk error
dataNode.checkDiskErrorAsync(); final long lastCheckTimestamp = dataNode.getLastDiskErrorCheck();
Thread.sleep(dataNode.checkDiskErrorInterval); dataNode.checkDiskError();
long lastDiskErrorCheck = dataNode.getLastDiskErrorCheck(); GenericTestUtils.waitFor(new Supplier<Boolean>() {
assertTrue("Disk Error check is not performed within " + dataNode.checkDiskErrorInterval + " ms", ((Time.monotonicNow()-lastDiskErrorCheck) < (dataNode.checkDiskErrorInterval + slackTime))); @Override
public Boolean get() {
return dataNode.getLastDiskErrorCheck() > lastCheckTimestamp;
}
}, 100, 60000);
} }
@Test @Test

View File

@ -35,7 +35,10 @@ import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.*; import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -103,8 +106,8 @@ public class TestDatasetVolumeChecker {
*/ */
checker.checkVolume(volume, new DatasetVolumeChecker.Callback() { checker.checkVolume(volume, new DatasetVolumeChecker.Callback() {
@Override @Override
public void call(Set<StorageLocation> healthyVolumes, public void call(Set<FsVolumeSpi> healthyVolumes,
Set<StorageLocation> failedVolumes) { Set<FsVolumeSpi> failedVolumes) {
numCallbackInvocations.incrementAndGet(); numCallbackInvocations.incrementAndGet();
if (expectedVolumeHealth != null && expectedVolumeHealth != FAILED) { if (expectedVolumeHealth != null && expectedVolumeHealth != FAILED) {
assertThat(healthyVolumes.size(), is(1)); assertThat(healthyVolumes.size(), is(1));
@ -138,7 +141,7 @@ public class TestDatasetVolumeChecker {
new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer()); new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer());
checker.setDelegateChecker(new DummyChecker()); checker.setDelegateChecker(new DummyChecker());
Set<StorageLocation> failedVolumes = checker.checkAllVolumes(dataset); Set<FsVolumeSpi> failedVolumes = checker.checkAllVolumes(dataset);
LOG.info("Got back {} failed volumes", failedVolumes.size()); LOG.info("Got back {} failed volumes", failedVolumes.size());
if (expectedVolumeHealth == null || expectedVolumeHealth == FAILED) { if (expectedVolumeHealth == null || expectedVolumeHealth == FAILED) {
@ -174,8 +177,8 @@ public class TestDatasetVolumeChecker {
dataset, new DatasetVolumeChecker.Callback() { dataset, new DatasetVolumeChecker.Callback() {
@Override @Override
public void call( public void call(
Set<StorageLocation> healthyVolumes, Set<FsVolumeSpi> healthyVolumes,
Set<StorageLocation> failedVolumes) { Set<FsVolumeSpi> failedVolumes) {
LOG.info("Got back {} failed volumes", failedVolumes.size()); LOG.info("Got back {} failed volumes", failedVolumes.size());
if (expectedVolumeHealth == null || if (expectedVolumeHealth == null ||
expectedVolumeHealth == FAILED) { expectedVolumeHealth == FAILED) {
@ -236,7 +239,7 @@ public class TestDatasetVolumeChecker {
return dataset; return dataset;
} }
private static List<FsVolumeSpi> makeVolumes( static List<FsVolumeSpi> makeVolumes(
int numVolumes, VolumeCheckResult health) throws Exception { int numVolumes, VolumeCheckResult health) throws Exception {
final List<FsVolumeSpi> volumes = new ArrayList<>(numVolumes); final List<FsVolumeSpi> volumes = new ArrayList<>(numVolumes);
for (int i = 0; i < numVolumes; ++i) { for (int i = 0; i < numVolumes; ++i) {

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.*; import org.apache.hadoop.hdfs.server.datanode.fsdataset.*;
import org.apache.hadoop.util.FakeTimer; import org.apache.hadoop.util.FakeTimer;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -47,6 +48,19 @@ public class TestDatasetVolumeCheckerFailures {
public static final Logger LOG =LoggerFactory.getLogger( public static final Logger LOG =LoggerFactory.getLogger(
TestDatasetVolumeCheckerFailures.class); TestDatasetVolumeCheckerFailures.class);
private FakeTimer timer;
private Configuration conf;
private static final long MIN_DISK_CHECK_GAP_MS = 1000; // 1 second.
@Before
public void commonInit() {
timer = new FakeTimer();
conf = new HdfsConfiguration();
conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
MIN_DISK_CHECK_GAP_MS, TimeUnit.MILLISECONDS);
}
/** /**
* Test timeout in {@link DatasetVolumeChecker#checkAllVolumes}. * Test timeout in {@link DatasetVolumeChecker#checkAllVolumes}.
* @throws Exception * @throws Exception
@ -61,14 +75,13 @@ public class TestDatasetVolumeCheckerFailures {
TestDatasetVolumeChecker.makeDataset(volumes); TestDatasetVolumeChecker.makeDataset(volumes);
// Create a disk checker with a very low timeout. // Create a disk checker with a very low timeout.
final HdfsConfiguration conf = new HdfsConfiguration();
conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY, conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
1, TimeUnit.SECONDS); 1, TimeUnit.SECONDS);
final DatasetVolumeChecker checker = final DatasetVolumeChecker checker =
new DatasetVolumeChecker(conf, new FakeTimer()); new DatasetVolumeChecker(conf, new FakeTimer());
// Ensure that the hung volume is detected as failed. // Ensure that the hung volume is detected as failed.
Set<StorageLocation> failedVolumes = checker.checkAllVolumes(dataset); Set<FsVolumeSpi> failedVolumes = checker.checkAllVolumes(dataset);
assertThat(failedVolumes.size(), is(1)); assertThat(failedVolumes.size(), is(1));
} }
@ -86,10 +99,10 @@ public class TestDatasetVolumeCheckerFailures {
final FsDatasetSpi<FsVolumeSpi> dataset = final FsDatasetSpi<FsVolumeSpi> dataset =
TestDatasetVolumeChecker.makeDataset(volumes); TestDatasetVolumeChecker.makeDataset(volumes);
DatasetVolumeChecker checker = DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
new DatasetVolumeChecker(new HdfsConfiguration(), new FakeTimer()); Set<FsVolumeSpi> failedVolumes = checker.checkAllVolumes(dataset);
Set<StorageLocation> failedVolumes = checker.checkAllVolumes(dataset);
assertThat(failedVolumes.size(), is(0)); assertThat(failedVolumes.size(), is(0));
assertThat(checker.getNumSyncDatasetChecks(), is(0L));
// The closed volume should not have been checked as it cannot // The closed volume should not have been checked as it cannot
// be referenced. // be referenced.
@ -98,13 +111,10 @@ public class TestDatasetVolumeCheckerFailures {
@Test(timeout=60000) @Test(timeout=60000)
public void testMinGapIsEnforcedForSyncChecks() throws Exception { public void testMinGapIsEnforcedForSyncChecks() throws Exception {
final List<FsVolumeSpi> volumes =
TestDatasetVolumeChecker.makeVolumes(1, VolumeCheckResult.HEALTHY);
final FsDatasetSpi<FsVolumeSpi> dataset = final FsDatasetSpi<FsVolumeSpi> dataset =
TestDatasetVolumeChecker.makeDataset(Collections.emptyList()); TestDatasetVolumeChecker.makeDataset(volumes);
final FakeTimer timer = new FakeTimer();
final Configuration conf = new HdfsConfiguration();
final long minGapMs = 100;
conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
minGapMs, TimeUnit.MILLISECONDS);
final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer); final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
checker.checkAllVolumes(dataset); checker.checkAllVolumes(dataset);
@ -116,7 +126,7 @@ public class TestDatasetVolumeCheckerFailures {
assertThat(checker.getNumSkippedChecks(), is(1L)); assertThat(checker.getNumSkippedChecks(), is(1L));
// Re-check after advancing the timer. Ensure the check is performed. // Re-check after advancing the timer. Ensure the check is performed.
timer.advance(minGapMs); timer.advance(MIN_DISK_CHECK_GAP_MS);
checker.checkAllVolumes(dataset); checker.checkAllVolumes(dataset);
assertThat(checker.getNumSyncDatasetChecks(), is(2L)); assertThat(checker.getNumSyncDatasetChecks(), is(2L));
assertThat(checker.getNumSkippedChecks(), is(1L)); assertThat(checker.getNumSkippedChecks(), is(1L));
@ -124,13 +134,10 @@ public class TestDatasetVolumeCheckerFailures {
@Test(timeout=60000) @Test(timeout=60000)
public void testMinGapIsEnforcedForASyncChecks() throws Exception { public void testMinGapIsEnforcedForASyncChecks() throws Exception {
final List<FsVolumeSpi> volumes =
TestDatasetVolumeChecker.makeVolumes(1, VolumeCheckResult.HEALTHY);
final FsDatasetSpi<FsVolumeSpi> dataset = final FsDatasetSpi<FsVolumeSpi> dataset =
TestDatasetVolumeChecker.makeDataset(Collections.emptyList()); TestDatasetVolumeChecker.makeDataset(volumes);
final FakeTimer timer = new FakeTimer();
final Configuration conf = new HdfsConfiguration();
final long minGapMs = 100;
conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
minGapMs, TimeUnit.MILLISECONDS);
final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer); final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer);
checker.checkAllVolumesAsync(dataset, null); checker.checkAllVolumesAsync(dataset, null);
@ -142,7 +149,7 @@ public class TestDatasetVolumeCheckerFailures {
assertThat(checker.getNumSkippedChecks(), is(1L)); assertThat(checker.getNumSkippedChecks(), is(1L));
// Re-check after advancing the timer. Ensure the check is performed. // Re-check after advancing the timer. Ensure the check is performed.
timer.advance(minGapMs); timer.advance(MIN_DISK_CHECK_GAP_MS);
checker.checkAllVolumesAsync(dataset, null); checker.checkAllVolumesAsync(dataset, null);
assertThat(checker.getNumAsyncDatasetChecks(), is(2L)); assertThat(checker.getNumAsyncDatasetChecks(), is(2L));
assertThat(checker.getNumSkippedChecks(), is(1L)); assertThat(checker.getNumSkippedChecks(), is(1L));

View File

@ -239,8 +239,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
} }
@Override @Override
public Set<StorageLocation> checkDataDir() { public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
return null;
} }
@Override @Override

View File

@ -52,13 +52,10 @@ import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.FakeTimer; import org.apache.hadoop.util.FakeTimer;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.junit.Assert; import org.junit.Assert;
@ -66,8 +63,6 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Matchers; import org.mockito.Matchers;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
@ -76,16 +71,18 @@ import java.io.OutputStreamWriter;
import java.io.Writer; import java.io.Writer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DN_CACHED_DFSUSED_CHECK_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
@ -94,13 +91,10 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf; import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -339,68 +333,6 @@ public class TestFsDatasetImpl {
assertEquals(numExistingVolumes, getNumVolumes()); assertEquals(numExistingVolumes, getNumVolumes());
} }
@Test(timeout = 5000)
public void testChangeVolumeWithRunningCheckDirs() throws IOException {
RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser =
new RoundRobinVolumeChoosingPolicy<>();
conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
final BlockScanner blockScanner = new BlockScanner(datanode);
final FsVolumeList volumeList = new FsVolumeList(
Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
final List<FsVolumeImpl> oldVolumes = new ArrayList<>();
// Initialize FsVolumeList with 5 mock volumes.
final int NUM_VOLUMES = 5;
for (int i = 0; i < NUM_VOLUMES; i++) {
FsVolumeImpl volume = mock(FsVolumeImpl.class);
oldVolumes.add(volume);
when(volume.getStorageLocation()).thenReturn(
StorageLocation.parse(new File("data" + i).toURI().toString()));
when(volume.checkClosed()).thenReturn(true);
FsVolumeReference ref = mock(FsVolumeReference.class);
when(ref.getVolume()).thenReturn(volume);
volumeList.addVolume(ref);
}
// When call checkDirs() on the 2nd volume, anther "thread" removes the 5th
// volume and add another volume. It does not affect checkDirs() running.
final FsVolumeImpl newVolume = mock(FsVolumeImpl.class);
final FsVolumeReference newRef = mock(FsVolumeReference.class);
when(newRef.getVolume()).thenReturn(newVolume);
when(newVolume.getStorageLocation()).thenReturn(
StorageLocation.parse(new File("data4").toURI().toString()));
FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
volumeList.removeVolume(
StorageLocation.parse((new File("data4")).toURI().toString()),
false);
volumeList.addVolume(newRef);
return null;
}
}).when(blockedVolume).checkDirs();
FsVolumeImpl brokenVolume = volumeList.getVolumes().get(2);
doThrow(new DiskChecker.DiskErrorException("broken"))
.when(brokenVolume).checkDirs();
volumeList.checkDirs();
// Since FsVolumeImpl#checkDirs() get a snapshot of the list of volumes
// before running removeVolume(), it is supposed to run checkDirs() on all
// the old volumes.
for (FsVolumeImpl volume : oldVolumes) {
verify(volume).checkDirs();
}
// New volume is not visible to checkDirs() process.
verify(newVolume, never()).checkDirs();
assertTrue(volumeList.getVolumes().contains(newVolume));
assertFalse(volumeList.getVolumes().contains(brokenVolume));
assertEquals(NUM_VOLUMES - 1, volumeList.getVolumes().size());
}
@Test @Test
public void testAddVolumeFailureReleasesInUseLock() throws IOException { public void testAddVolumeFailureReleasesInUseLock() throws IOException {
FsDatasetImpl spyDataset = spy(dataset); FsDatasetImpl spyDataset = spy(dataset);
@ -717,6 +649,9 @@ public class TestFsDatasetImpl {
Configuration config = new HdfsConfiguration(); Configuration config = new HdfsConfiguration();
config.setLong( config.setLong(
DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000); DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, 1000);
config.setTimeDuration(
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, 0,
TimeUnit.MILLISECONDS);
config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); config.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
@ -734,6 +669,8 @@ public class TestFsDatasetImpl {
getVolume(block); getVolume(block);
File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem() File finalizedDir = volume.getFinalizedDir(cluster.getNamesystem()
.getBlockPoolId()); .getBlockPoolId());
LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
DatanodeInfo info = lb.getLocations()[0];
if (finalizedDir.exists()) { if (finalizedDir.exists()) {
// Remove write and execute access so that checkDiskErrorThread detects // Remove write and execute access so that checkDiskErrorThread detects
@ -744,15 +681,14 @@ public class TestFsDatasetImpl {
Assert.assertTrue("Reference count for the volume should be greater " Assert.assertTrue("Reference count for the volume should be greater "
+ "than 0", volume.getReferenceCount() > 0); + "than 0", volume.getReferenceCount() > 0);
// Invoke the synchronous checkDiskError method // Invoke the synchronous checkDiskError method
dataNode.getFSDataset().checkDataDir(); dataNode.checkDiskError();
// Sleep for 1 second so that datanode can interrupt and cluster clean up // Sleep for 1 second so that datanode can interrupt and cluster clean up
GenericTestUtils.waitFor(new Supplier<Boolean>() { GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() { @Override public Boolean get() {
return volume.getReferenceCount() == 0; return volume.getReferenceCount() == 0;
} }
}, 100, 10); }, 100, 10);
LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0); assertThat(dataNode.getFSDataset().getNumFailedVolumes(), is(1));
DatanodeInfo info = lb.getLocations()[0];
try { try {
out.close(); out.close();

View File

@ -108,43 +108,6 @@ public class TestFsVolumeList {
} }
} }
@Test(timeout=30000)
public void testCheckDirsWithClosedVolume() throws IOException {
FsVolumeList volumeList = new FsVolumeList(
Collections.<VolumeFailureInfo>emptyList(), blockScanner, blockChooser);
final List<FsVolumeImpl> volumes = new ArrayList<>();
for (int i = 0; i < 3; i++) {
File curDir = new File(baseDir, "volume-" + i);
curDir.mkdirs();
FsVolumeImpl volume = new FsVolumeImplBuilder()
.setConf(conf)
.setDataset(dataset)
.setStorageID("storage-id")
.setStorageDirectory(
new StorageDirectory(StorageLocation.parse(curDir.getPath())))
.build();
volumes.add(volume);
volumeList.addVolume(volume.obtainReference());
}
// Close the 2nd volume.
volumes.get(1).setClosed();
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return volumes.get(1).checkClosed();
}
}, 100, 3000);
} catch (TimeoutException e) {
fail("timed out while waiting for volume to be removed.");
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
// checkDirs() should ignore the 2nd volume since it is closed.
volumeList.checkDirs();
}
@Test(timeout=30000) @Test(timeout=30000)
public void testReleaseVolumeRefIfNoBlockScanner() throws IOException { public void testReleaseVolumeRefIfNoBlockScanner() throws IOException {
FsVolumeList volumeList = new FsVolumeList( FsVolumeList volumeList = new FsVolumeList(