diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt
index 9305b06942..facf58ed5a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-5535.txt
@@ -93,3 +93,5 @@ HDFS-5535 subtasks:
HDFS-6015. Fix TestBlockRecovery
#testRaceBetweenReplicaRecoveryAndFinalizeBlock. (kihwal)
+
+ HDFS-5924. Utilize OOB upgrade message processing for writes. (kihwal)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index d908eb4e27..3c066c78f7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -30,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
@@ -268,6 +270,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
final int getFileBlockStorageLocationsTimeout;
final int retryTimesForGetLastBlockLength;
final int retryIntervalForGetLastBlockLength;
+ final long datanodeRestartTimeout;
final boolean useLegacyBlockReader;
final boolean useLegacyBlockReaderLocal;
@@ -411,6 +414,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory {
shortCircuitCacheStaleThresholdMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS,
DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT);
+
+ datanodeRestartTimeout = conf.getLong(
+ DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY,
+ DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT) * 1000;
}
private DataChecksum.Type getChecksumType(Configuration conf) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index d77bdc17f0..7789b60fe3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -94,6 +94,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_DEFAULT = 3000;
public static final String DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL = "dfs.client.write.exclude.nodes.cache.expiry.interval.millis";
public static final long DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10 * 60 * 1000; // 10 minutes, in ms
+ public static final String DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY = "dfs.client.datanode-restart.timeout";
+ public static final long DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT = 30;
+ public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration";
+ public static final long DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT = 50;
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
public static final String DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY = "dfs.namenode.backup.http-address";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index dc5ccf4a5f..c1cf238d59 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.BufferOverflowException;
@@ -335,6 +336,8 @@ public class DFSOutputStream extends FSOutputSummer
private String[] favoredNodes;
volatile boolean hasError = false;
volatile int errorIndex = -1;
+ volatile int restartingNodeIndex = -1; // Restarting node index
+ private long restartDeadline = 0; // Deadline of DN restart
private BlockConstructionStage stage; // block construction stage
private long bytesSent = 0; // number of bytes that've been sent
@@ -471,7 +474,7 @@ public class DFSOutputStream extends FSOutputSummer
try {
// process datanode IO errors if any
boolean doSleep = false;
- if (hasError && errorIndex>=0) {
+ if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) {
doSleep = processDatanodeError();
}
@@ -571,8 +574,12 @@ public class DFSOutputStream extends FSOutputSummer
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
- // write to primary DN
- errorIndex = 0;
+ // write to primary DN. If a failed or restarting node has already
+ // been recorded by the responder, the following call will have no
+ // effect. Pipeline recovery can handle only one node error at a
+ // time. If the primary node fails again during the recovery, it
+ // will be taken out then.
+ tryMarkPrimaryDatanodeFailed();
throw e;
}
lastPacket = Time.now();
@@ -609,12 +616,16 @@ public class DFSOutputStream extends FSOutputSummer
Thread.sleep(artificialSlowdown);
}
} catch (Throwable e) {
- DFSClient.LOG.warn("DataStreamer Exception", e);
+ // Log warning if there was a real error.
+ if (restartingNodeIndex == -1) {
+ DFSClient.LOG.warn("DataStreamer Exception", e);
+ }
if (e instanceof IOException) {
setLastException((IOException)e);
}
hasError = true;
- if (errorIndex == -1) { // not a datanode error
+ if (errorIndex == -1 && restartingNodeIndex == -1) {
+ // Not a datanode issue
streamerClosed = true;
}
}
@@ -694,6 +705,65 @@ public class DFSOutputStream extends FSOutputSummer
}
}
+ // The following synchronized methods are used whenever
+ // errorIndex or restartingNodeIndex is set. This is because
+ // check & set needs to be atomic. Simply reading variables
+ // does not require a synchronization. When responder is
+ // not running (e.g. during pipeline recovery), there is no
+ // need to use these methods.
+
+ /** Set the error node index. Called by responder */
+ synchronized void setErrorIndex(int idx) {
+ errorIndex = idx;
+ }
+
+ /** Set the restarting node index. Called by responder */
+ synchronized void setRestartingNodeIndex(int idx) {
+ restartingNodeIndex = idx;
+ // If the data streamer has already set the primary node
+ // bad, clear it. It is likely that the write failed due to
+ // the DN shutdown. Even if it was a real failure, the pipeline
+ // recovery will take care of it.
+ errorIndex = -1;
+ }
+
+ /**
+ * This method is used when no explicit error report was received,
+ * but something failed. When the primary node is a suspect or
+ * unsure about the cause, the primary node is marked as failed.
+ */
+ synchronized void tryMarkPrimaryDatanodeFailed() {
+ // There should be no existing error and no ongoing restart.
+ if ((errorIndex == -1) && (restartingNodeIndex == -1)) {
+ errorIndex = 0;
+ }
+ }
+
+ /**
+ * Examine whether it is worth waiting for a node to restart.
+ * @param index the node index
+ */
+ boolean shouldWaitForRestart(int index) {
+ // Only one node in the pipeline.
+ if (nodes.length == 1) {
+ return true;
+ }
+
+ // Is it a local node?
+ InetAddress addr = null;
+ try {
+ addr = InetAddress.getByName(nodes[index].getIpAddr());
+ } catch (java.net.UnknownHostException e) {
+ // we are passing an ip address. this should not happen.
+ assert false;
+ }
+
+ if (addr != null && NetUtils.isLocalAddress(addr)) {
+ return true;
+ }
+ return false;
+ }
+
//
// Processes responses from the datanodes. A packet is removed
// from the ackQueue when its response arrives.
@@ -727,8 +797,20 @@ public class DFSOutputStream extends FSOutputSummer
// processes response status from datanodes.
for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
final Status reply = ack.getReply(i);
+ // Restart will not be treated differently unless it is
+ // the local node or the only one in the pipeline.
+ if (PipelineAck.isRestartOOBStatus(reply) &&
+ shouldWaitForRestart(i)) {
+ restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
+ Time.now();
+ setRestartingNodeIndex(i);
+ String message = "A datanode is restarting: " + targets[i];
+ DFSClient.LOG.info(message);
+ throw new IOException(message);
+ }
+ // node error
if (reply != SUCCESS) {
- errorIndex = i; // first bad datanode
+ setErrorIndex(i); // first bad datanode
throw new IOException("Bad response " + reply +
" for block " + block +
" from datanode " +
@@ -777,12 +859,16 @@ public class DFSOutputStream extends FSOutputSummer
setLastException((IOException)e);
}
hasError = true;
- errorIndex = errorIndex==-1 ? 0 : errorIndex;
+ // If no explicit error report was received, mark the primary
+ // node as failed.
+ tryMarkPrimaryDatanodeFailed();
synchronized (dataQueue) {
dataQueue.notifyAll();
}
- DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception "
- + " for block " + block, e);
+ if (restartingNodeIndex == -1) {
+ DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception "
+ + " for block " + block, e);
+ }
responderClosed = true;
}
}
@@ -1001,6 +1087,24 @@ public class DFSOutputStream extends FSOutputSummer
boolean success = false;
long newGS = 0L;
while (!success && !streamerClosed && dfsClient.clientRunning) {
+ // Sleep before reconnect if a dn is restarting.
+ // This process will be repeated until the deadline or the datanode
+ // starts back up.
+ if (restartingNodeIndex >= 0) {
+ // 4 seconds or the configured deadline period, whichever is shorter.
+ // This is the retry interval and recovery will be retried in this
+ // interval until timeout or success.
+ long delay = Math.min(dfsClient.getConf().datanodeRestartTimeout,
+ 4000L);
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException ie) {
+ lastException.set(new IOException("Interrupted while waiting for " +
+ "datanode to restart. " + nodes[restartingNodeIndex]));
+ streamerClosed = true;
+ return false;
+ }
+ }
boolean isRecovery = hasError;
// remove bad datanode from list of datanodes.
// If errorIndex was not set (i.e. appends), then do not remove
@@ -1037,7 +1141,24 @@ public class DFSOutputStream extends FSOutputSummer
setPipeline(newnodes, newStorageIDs);
- hasError = false;
+ // Just took care of a node error while waiting for a node restart
+ if (restartingNodeIndex >= 0) {
+ // If the error came from a node further away than the restarting
+ // node, the restart must have been complete.
+ if (errorIndex > restartingNodeIndex) {
+ restartingNodeIndex = -1;
+ } else if (errorIndex < restartingNodeIndex) {
+ // the node index has shifted.
+ restartingNodeIndex--;
+ } else {
+ // this shouldn't happen...
+ assert false;
+ }
+ }
+
+ if (restartingNodeIndex == -1) {
+ hasError = false;
+ }
lastException.set(null);
errorIndex = -1;
}
@@ -1066,7 +1187,34 @@ public class DFSOutputStream extends FSOutputSummer
} else {
success = createBlockOutputStream(nodes, newGS, isRecovery);
}
- }
+
+ if (restartingNodeIndex >= 0) {
+ assert hasError == true;
+ // check errorIndex set above
+ if (errorIndex == restartingNodeIndex) {
+ // ignore, if came from the restarting node
+ errorIndex = -1;
+ }
+ // still within the deadline
+ if (Time.now() < restartDeadline) {
+ continue; // with in the deadline
+ }
+ // expired. declare the restarting node dead
+ restartDeadline = 0;
+ int expiredNodeIndex = restartingNodeIndex;
+ restartingNodeIndex = -1;
+ DFSClient.LOG.warn("Datanode did not restart in time: " +
+ nodes[expiredNodeIndex]);
+ // Mark the restarting node as failed. If there is any other failed
+ // node during the last pipeline construction attempt, it will not be
+ // overwritten/dropped. In this case, the restarting node will get
+ // excluded in the following attempt, if it still does not come up.
+ if (errorIndex == -1) {
+ errorIndex = expiredNodeIndex;
+ }
+ // From this point on, normal pipeline recovery applies.
+ }
+ } // while
if (success) {
// update pipeline at the namenode
@@ -1144,6 +1292,7 @@ public class DFSOutputStream extends FSOutputSummer
}
Status pipelineStatus = SUCCESS;
String firstBadLink = "";
+ boolean checkRestart = false;
if (DFSClient.LOG.isDebugEnabled()) {
for (int i = 0; i < nodes.length; i++) {
DFSClient.LOG.debug("pipeline = " + nodes[i]);
@@ -1192,6 +1341,16 @@ public class DFSOutputStream extends FSOutputSummer
pipelineStatus = resp.getStatus();
firstBadLink = resp.getFirstBadLink();
+ // Got an restart OOB ack.
+ // If a node is already restarting, this status is not likely from
+ // the same node. If it is from a different node, it is not
+ // from the local datanode. Thus it is safe to treat this as a
+ // regular node error.
+ if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
+ restartingNodeIndex == -1) {
+ checkRestart = true;
+ throw new IOException("A datanode is restarting.");
+ }
if (pipelineStatus != SUCCESS) {
if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
@@ -1205,9 +1364,12 @@ public class DFSOutputStream extends FSOutputSummer
assert null == blockStream : "Previous blockStream unclosed";
blockStream = out;
result = true; // success
-
+ restartingNodeIndex = -1;
+ hasError = false;
} catch (IOException ie) {
- DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
+ if (restartingNodeIndex == -1) {
+ DFSClient.LOG.info("Exception in createBlockOutputStream", ie);
+ }
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to "
@@ -1230,8 +1392,18 @@ public class DFSOutputStream extends FSOutputSummer
}
}
} else {
+ assert checkRestart == false;
errorIndex = 0;
}
+ // Check whether there is a restart worth waiting for.
+ if (checkRestart && shouldWaitForRestart(errorIndex)) {
+ restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
+ Time.now();
+ restartingNodeIndex = errorIndex;
+ errorIndex = -1;
+ DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
+ nodes[restartingNodeIndex]);
+ }
hasError = true;
setLastException(ie);
result = false; // error
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 29d416ad52..3e6f5c8a41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -23,8 +23,10 @@ import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.File;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
+import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
@@ -52,6 +55,7 @@ import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
@@ -116,6 +120,7 @@ class BlockReceiver implements Closeable {
private final boolean isTransfer;
private boolean syncOnClose;
+ private long restartBudget;
BlockReceiver(final ExtendedBlock block, final DataInputStream in,
final String inAddr, final String myAddr,
@@ -135,6 +140,7 @@ class BlockReceiver implements Closeable {
this.clientname = clientname;
this.isDatanode = clientname.length() == 0;
this.isClient = !this.isDatanode;
+ this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
//for datanode, we have
//1: clientName.length() == 0, and
@@ -742,16 +748,35 @@ class BlockReceiver implements Closeable {
if (responder != null) {
// In case this datanode is shutting down for quick restart,
// send a special ack upstream.
- if (datanode.isRestarting()) {
+ if (datanode.isRestarting() && isClient && !isTransfer) {
+ File blockFile = ((ReplicaInPipeline)replicaInfo).getBlockFile();
+ File restartMeta = new File(blockFile.getParent() +
+ File.pathSeparator + "." + blockFile.getName() + ".restart");
+ if (restartMeta.exists()) {
+ restartMeta.delete();
+ }
+ try {
+ FileWriter out = new FileWriter(restartMeta);
+ // write out the current time.
+ out.write(Long.toString(Time.now() + restartBudget));
+ out.flush();
+ out.close();
+ } catch (IOException ioe) {
+ // The worst case is not recovering this RBW replica.
+ // Client will fall back to regular pipeline recovery.
+ }
try {
((PacketResponder) responder.getRunnable()).
sendOOBResponse(PipelineAck.getRestartOOBStatus());
+ // Even if the connection is closed after the ack packet is
+ // flushed, the client can react to the connection closure
+ // first. Insert a delay to lower the chance of client
+ // missing the OOB ack.
+ Thread.sleep(1000);
} catch (InterruptedException ie) {
// It is already going down. Ignore this.
} catch (IOException ioe) {
LOG.info("Error sending OOB Ack.", ioe);
- // The OOB ack could not be sent. Since the datanode is going
- // down, this is ignored.
}
}
responder.interrupt();
@@ -1279,7 +1304,6 @@ class BlockReceiver implements Closeable {
&& offsetInBlock > replicaInfo.getBytesAcked()) {
replicaInfo.setBytesAcked(offsetInBlock);
}
-
// send my ack back to upstream datanode
replyAck.write(upstreamOut);
upstreamOut.flush();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 73f3661182..199a2c0c38 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -46,6 +46,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_SUPPORTED_NA
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -82,6 +84,7 @@ public class DNConf {
final String encryptionAlgorithm;
final long xceiverStopTimeout;
+ final long restartReplicaExpiry;
final long maxLockedMemory;
@@ -157,6 +160,10 @@ public class DNConf {
this.maxLockedMemory = conf.getLong(
DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
+
+ this.restartReplicaExpiry = conf.getLong(
+ DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY,
+ DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L;
}
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index ed9ba589dc..e3e441028d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -21,9 +21,11 @@ import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
+import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DU;
@@ -36,11 +38,13 @@ import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.Time;
/**
* A block pool slice represents a portion of a block pool stored on a volume.
@@ -191,9 +195,35 @@ class BlockPoolSlice {
newReplica = new FinalizedReplica(blockId,
blockFile.length(), genStamp, volume, blockFile.getParentFile());
} else {
- newReplica = new ReplicaWaitingToBeRecovered(blockId,
- validateIntegrityAndSetLength(blockFile, genStamp),
- genStamp, volume, blockFile.getParentFile());
+
+ boolean loadRwr = true;
+ File restartMeta = new File(blockFile.getParent() +
+ File.pathSeparator + "." + blockFile.getName() + ".restart");
+ Scanner sc = null;
+ try {
+ sc = new Scanner(restartMeta);
+ // The restart meta file exists
+ if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
+ // It didn't expire. Load the replica as a RBW.
+ newReplica = new ReplicaBeingWritten(blockId,
+ validateIntegrityAndSetLength(blockFile, genStamp),
+ genStamp, volume, blockFile.getParentFile(), null);
+ loadRwr = false;
+ }
+ restartMeta.delete();
+ } catch (FileNotFoundException fnfe) {
+ // nothing to do here
+ } finally {
+ if (sc != null) {
+ sc.close();
+ }
+ }
+ // Restart meta doesn't exist or expired.
+ if (loadRwr) {
+ newReplica = new ReplicaWaitingToBeRecovered(blockId,
+ validateIntegrityAndSetLength(blockFile, genStamp),
+ genStamp, volume, blockFile.getParentFile());
+ }
}
ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
@@ -298,4 +328,4 @@ class BlockPoolSlice {
void shutdown() {
dfsUsage.shutdown();
}
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 26c7b70adb..2468cb1074 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -1030,6 +1030,18 @@
+
+ dfs.client.datanode-restart.timeout
+ 30
+
+ Expert only. The time to wait, in seconds, from reception of an
+ datanode shutdown notification for quick restart, until declaring
+ the datanode dead and invoking the normal recovery mechanisms.
+ The notification is sent by a datanode when it is being shutdown
+ using the shutdownDatanode admin command with the upgrade option.
+
+
+
dfs.nameservices
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
index d583bb302c..04853bd2df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
@@ -162,19 +162,25 @@ public class TestClientProtocolForPipelineRecovery {
}
}
- /** Test recovery on restart OOB message */
+ /**
+ * Test recovery on restart OOB message. It also tests the delivery of
+ * OOB ack originating from the primary datanode. Since there is only
+ * one node in the cluster, failure of restart-recovery will fail the
+ * test.
+ */
@Test
public void testPipelineRecoveryOnOOB() throws Exception {
Configuration conf = new HdfsConfiguration();
+ conf.set(DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, "15");
MiniDFSCluster cluster = null;
try {
- int numDataNodes = 3;
+ int numDataNodes = 1;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
cluster.waitActive();
FileSystem fileSys = cluster.getFileSystem();
Path file = new Path("dataprotocol2.dat");
- DFSTestUtil.createFile(fileSys, file, 10240L, (short)2, 0L);
+ DFSTestUtil.createFile(fileSys, file, 10240L, (short)1, 0L);
DFSOutputStream out = (DFSOutputStream)(fileSys.append(file).
getWrappedStream());
out.write(1);
@@ -186,10 +192,66 @@ public class TestClientProtocolForPipelineRecovery {
// issue shutdown to the datanode.
final String[] args1 = {"-shutdownDatanode", dnAddr, "upgrade" };
Assert.assertEquals(0, dfsadmin.run(args1));
+ // Wait long enough to receive an OOB ack before closing the file.
+ Thread.sleep(4000);
+ // Retart the datanode
+ cluster.restartDataNode(0, true);
+ // The following forces a data packet and end of block packets to be sent.
out.close();
- Thread.sleep(3000);
- final String[] args2 = {"-getDatanodeInfo", dnAddr };
- Assert.assertEquals(-1, dfsadmin.run(args2));
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ /** Test restart timeout */
+ @Test
+ public void testPipelineRecoveryOnRestartFailure() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.set(DFSConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, "5");
+ MiniDFSCluster cluster = null;
+ try {
+ int numDataNodes = 2;
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
+ cluster.waitActive();
+ FileSystem fileSys = cluster.getFileSystem();
+
+ Path file = new Path("dataprotocol3.dat");
+ DFSTestUtil.createFile(fileSys, file, 10240L, (short)2, 0L);
+ DFSOutputStream out = (DFSOutputStream)(fileSys.append(file).
+ getWrappedStream());
+ out.write(1);
+ out.hflush();
+
+ DFSAdmin dfsadmin = new DFSAdmin(conf);
+ DataNode dn = cluster.getDataNodes().get(0);
+ final String dnAddr1 = dn.getDatanodeId().getIpcAddr(false);
+ // issue shutdown to the datanode.
+ final String[] args1 = {"-shutdownDatanode", dnAddr1, "upgrade" };
+ Assert.assertEquals(0, dfsadmin.run(args1));
+ Thread.sleep(4000);
+ // This should succeed without restarting the node. The restart will
+ // expire and regular pipeline recovery will kick in.
+ out.close();
+
+ // At this point there is only one node in the cluster.
+ out = (DFSOutputStream)(fileSys.append(file).
+ getWrappedStream());
+ out.write(1);
+ out.hflush();
+
+ dn = cluster.getDataNodes().get(1);
+ final String dnAddr2 = dn.getDatanodeId().getIpcAddr(false);
+ // issue shutdown to the datanode.
+ final String[] args2 = {"-shutdownDatanode", dnAddr2, "upgrade" };
+ Assert.assertEquals(0, dfsadmin.run(args2));
+ Thread.sleep(4000);
+ try {
+ // close should fail
+ out.close();
+ assert false;
+ } catch (IOException ioe) { }
} finally {
if (cluster != null) {
cluster.shutdown();