HDFS-5185. DN fails to startup if one of the data dir is full. Contributed by Vinayakumar B.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1615504 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinayakumar B 2014-08-04 08:43:51 +00:00
parent 30a9ec26f7
commit 33518e5613
5 changed files with 30 additions and 14 deletions

View File

@ -433,6 +433,8 @@ Release 2.6.0 - UNRELEASED
HDFS-5723. Append failed FINALIZED replica should not be accepted as valid HDFS-5723. Append failed FINALIZED replica should not be accepted as valid
when that block is underconstruction (vinayakumarb) when that block is underconstruction (vinayakumarb)
HDFS-5185. DN fails to startup if one of the data dir is full. (vinayakumarb)
Release 2.5.0 - UNRELEASED Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -253,7 +253,7 @@ class BlockReceiver implements Closeable {
if (cause != null) { // possible disk error if (cause != null) { // possible disk error
ioe = cause; ioe = cause;
datanode.checkDiskError(); datanode.checkDiskErrorAsync();
} }
throw ioe; throw ioe;
@ -329,7 +329,7 @@ public void close() throws IOException {
} }
// disk check // disk check
if(ioe != null) { if(ioe != null) {
datanode.checkDiskError(); datanode.checkDiskErrorAsync();
throw ioe; throw ioe;
} }
} }
@ -639,7 +639,7 @@ private int receivePacket() throws IOException {
manageWriterOsCache(offsetInBlock); manageWriterOsCache(offsetInBlock);
} }
} catch (IOException iex) { } catch (IOException iex) {
datanode.checkDiskError(); datanode.checkDiskErrorAsync();
throw iex; throw iex;
} }
} }
@ -1208,7 +1208,7 @@ public void run() {
} catch (IOException e) { } catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e); LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) { if (running) {
datanode.checkDiskError(); datanode.checkDiskErrorAsync();
LOG.info(myString, e); LOG.info(myString, e);
running = false; running = false;
if (!Thread.interrupted()) { // failure not caused by interruption if (!Thread.interrupted()) { // failure not caused by interruption

View File

@ -1075,6 +1075,11 @@ void initBlockPool(BPOfferService bpos) throws IOException {
// In the case that this is the first block pool to connect, initialize // In the case that this is the first block pool to connect, initialize
// the dataset, block scanners, etc. // the dataset, block scanners, etc.
initStorage(nsInfo); initStorage(nsInfo);
// Exclude failed disks before initializing the block pools to avoid startup
// failures.
checkDiskError();
initPeriodicScanners(conf); initPeriodicScanners(conf);
data.addBlockPool(nsInfo.getBlockPoolID(), conf); data.addBlockPool(nsInfo.getBlockPoolID(), conf);
@ -1510,9 +1515,9 @@ public void shutdown() {
/** /**
* Check if there is a disk failure and if so, handle the error * Check if there is a disk failure asynchronously and if so, handle the error
*/ */
public void checkDiskError() { public void checkDiskErrorAsync() {
synchronized(checkDiskErrorMutex) { synchronized(checkDiskErrorMutex) {
checkDiskErrorFlag = true; checkDiskErrorFlag = true;
if(checkDiskErrorThread == null) { if(checkDiskErrorThread == null) {
@ -1821,7 +1826,7 @@ public void run() {
LOG.warn(bpReg + ":Failed to transfer " + b + " to " + LOG.warn(bpReg + ":Failed to transfer " + b + " to " +
targets[0] + " got ", ie); targets[0] + " got ", ie);
// check if there are any disk problem // check if there are any disk problem
checkDiskError(); checkDiskErrorAsync();
} finally { } finally {
xmitsInProgress.getAndDecrement(); xmitsInProgress.getAndDecrement();
IOUtils.closeStream(blockSender); IOUtils.closeStream(blockSender);
@ -2759,7 +2764,18 @@ DataStorage getStorage() {
public ShortCircuitRegistry getShortCircuitRegistry() { public ShortCircuitRegistry getShortCircuitRegistry() {
return shortCircuitRegistry; return shortCircuitRegistry;
} }
/**
* Check the disk error
*/
private void checkDiskError() {
try {
data.checkDataDir();
} catch (DiskErrorException de) {
handleDiskError(de.getMessage());
}
}
/** /**
* Starts a new thread which will check for disk error check request * Starts a new thread which will check for disk error check request
* every 5 sec * every 5 sec
@ -2776,9 +2792,7 @@ public void run() {
} }
if(tempFlag) { if(tempFlag) {
try { try {
data.checkDataDir(); checkDiskError();
} catch (DiskErrorException de) {
handleDiskError(de.getMessage());
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Unexpected exception occurred while checking disk error " + e); LOG.warn("Unexpected exception occurred while checking disk error " + e);
checkDiskErrorThread = null; checkDiskErrorThread = null;

View File

@ -1151,7 +1151,7 @@ File validateBlockFile(String bpid, Block b) {
return f; return f;
// if file is not null, but doesn't exist - possibly disk failed // if file is not null, but doesn't exist - possibly disk failed
datanode.checkDiskError(); datanode.checkDiskErrorAsync();
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {

View File

@ -201,7 +201,7 @@ public void testLocalDirs() throws Exception {
} }
/** /**
* Checks whether {@link DataNode#checkDiskError()} is being called or not. * Checks whether {@link DataNode#checkDiskErrorAsync()} is being called or not.
* Before refactoring the code the above function was not getting called * Before refactoring the code the above function was not getting called
* @throws IOException, InterruptedException * @throws IOException, InterruptedException
*/ */
@ -214,7 +214,7 @@ public void testcheckDiskError() throws IOException, InterruptedException {
DataNode dataNode = cluster.getDataNodes().get(0); DataNode dataNode = cluster.getDataNodes().get(0);
long slackTime = dataNode.checkDiskErrorInterval/2; long slackTime = dataNode.checkDiskErrorInterval/2;
//checking for disk error //checking for disk error
dataNode.checkDiskError(); dataNode.checkDiskErrorAsync();
Thread.sleep(dataNode.checkDiskErrorInterval); Thread.sleep(dataNode.checkDiskErrorInterval);
long lastDiskErrorCheck = dataNode.getLastDiskErrorCheck(); long lastDiskErrorCheck = dataNode.getLastDiskErrorCheck();
assertTrue("Disk Error check is not performed within " + dataNode.checkDiskErrorInterval + " ms", ((Time.monotonicNow()-lastDiskErrorCheck) < (dataNode.checkDiskErrorInterval + slackTime))); assertTrue("Disk Error check is not performed within " + dataNode.checkDiskErrorInterval + " ms", ((Time.monotonicNow()-lastDiskErrorCheck) < (dataNode.checkDiskErrorInterval + slackTime)));