diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index ad1b3595cb..abb039c50c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -40,6 +40,7 @@ import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.ScriptBasedMapping; +import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.annotations.VisibleForTesting; @@ -118,6 +119,19 @@ public class ClientContext { private NodeBase clientNode; private boolean topologyResolutionEnabled; + private Daemon deadNodeDetectorThr = null; + + /** + * The switch to DeadNodeDetector. + */ + private boolean deadNodeDetectionEnabled = false; + + /** + * Detect the dead datanodes in advance, and share this information among all + * the DFSInputStreams in the same client. + */ + private DeadNodeDetector deadNodeDetector = null; + private ClientContext(String name, DfsClientConf conf, Configuration config) { final ShortCircuitConf scConf = conf.getShortCircuitConf(); @@ -134,6 +148,12 @@ private ClientContext(String name, DfsClientConf conf, this.byteArrayManager = ByteArrayManager.newInstance( conf.getWriteByteArrayManagerConf()); + this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled(); + if (deadNodeDetectionEnabled && deadNodeDetector == null) { + deadNodeDetector = new DeadNodeDetector(name); + deadNodeDetectorThr = new Daemon(deadNodeDetector); + deadNodeDetectorThr.start(); + } initTopologyResolution(config); } @@ -251,4 +271,33 @@ public int getNetworkDistance(DatanodeInfo datanodeInfo) throws IOException { datanodeInfo.getNetworkLocation()); return NetworkTopology.getDistanceByPath(clientNode, node); } + + /** + * The switch to DeadNodeDetector. If true, DeadNodeDetector is available. + */ + public boolean isDeadNodeDetectionEnabled() { + return deadNodeDetectionEnabled; + } + + /** + * Obtain DeadNodeDetector of the current client. + */ + public DeadNodeDetector getDeadNodeDetector() { + return deadNodeDetector; + } + + /** + * Close dead node detector thread. + */ + public void stopDeadNodeDetectorThread() { + if (deadNodeDetectorThr != null) { + deadNodeDetectorThr.interrupt(); + try { + deadNodeDetectorThr.join(3000); + } catch (InterruptedException e) { + LOG.warn("Encountered exception while waiting to join on dead " + + "node detector thread.", e); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 56280f3a8b..c19aa96d36 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -44,6 +44,8 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadPoolExecutor; @@ -631,6 +633,8 @@ public synchronized void close() throws IOException { // lease renewal stops when all files are closed closeAllFilesBeingWritten(false); clientRunning = false; + // close dead node detector thread + clientContext.stopDeadNodeDetectorThread(); // close connections to the namenode closeConnectionToNamenode(); } @@ -3226,4 +3230,98 @@ public HAServiceProtocol.HAServiceState getHAServiceState() throws IOException { return namenode.getHAServiceState(); } + + /** + * If deadNodeDetectionEnabled is true, return the dead nodes that detected by + * all the DFSInputStreams in the same client. Otherwise return the dead nodes + * that detected by given DFSInputStream. + */ + public ConcurrentHashMap getDeadNodes( + DFSInputStream dfsInputStream) { + if (clientContext.isDeadNodeDetectionEnabled()) { + ConcurrentHashMap deadNodes = + new ConcurrentHashMap(); + if (dfsInputStream != null) { + deadNodes.putAll(dfsInputStream.getLocalDeadNodes()); + } + + Set detectDeadNodes = + clientContext.getDeadNodeDetector().clearAndGetDetectedDeadNodes(); + for (DatanodeInfo detectDeadNode : detectDeadNodes) { + deadNodes.put(detectDeadNode, detectDeadNode); + } + return deadNodes; + } else { + return dfsInputStream.getLocalDeadNodes(); + } + } + + /** + * If deadNodeDetectionEnabled is true, judgement based on whether this + * datanode is included or not in DeadNodeDetector. Otherwise judgment based + * given DFSInputStream. + */ + public boolean isDeadNode(DFSInputStream dfsInputStream, + DatanodeInfo datanodeInfo) { + if (isDeadNodeDetectionEnabled()) { + boolean isDeadNode = + clientContext.getDeadNodeDetector().isDeadNode(datanodeInfo); + if (dfsInputStream != null) { + isDeadNode = isDeadNode + || dfsInputStream.getLocalDeadNodes().contains(datanodeInfo); + } + return isDeadNode; + } else { + return dfsInputStream.getLocalDeadNodes().contains(datanodeInfo); + } + } + + /** + * Add given datanode in DeadNodeDetector. + */ + public void addNodeToDeadNodeDetector(DFSInputStream dfsInputStream, + DatanodeInfo datanodeInfo) { + if (!isDeadNodeDetectionEnabled()) { + LOG.debug("DeadNode detection is not enabled, skip to add node {}.", + datanodeInfo); + return; + } + clientContext.getDeadNodeDetector().addNodeToDetect(dfsInputStream, + datanodeInfo); + } + + /** + * Remove given datanode from DeadNodeDetector. + */ + public void removeNodeFromDeadNodeDetector(DFSInputStream dfsInputStream, + DatanodeInfo datanodeInfo) { + if (!isDeadNodeDetectionEnabled()) { + LOG.debug("DeadNode detection is not enabled, skip to remove node {}.", + datanodeInfo); + return; + } + clientContext.getDeadNodeDetector() + .removeNodeFromDeadNodeDetector(dfsInputStream, datanodeInfo); + } + + /** + * Remove datanodes that given block placed on from DeadNodeDetector. + */ + public void removeNodeFromDeadNodeDetector(DFSInputStream dfsInputStream, + LocatedBlocks locatedBlocks) { + if (!isDeadNodeDetectionEnabled() || locatedBlocks == null) { + LOG.debug("DeadNode detection is not enabled or given block {} " + + "is null, skip to remove node.", locatedBlocks); + return; + } + for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { + for (DatanodeInfo datanodeInfo : locatedBlock.getLocations()) { + removeNodeFromDeadNodeDetector(dfsInputStream, datanodeInfo); + } + } + } + + private boolean isDeadNodeDetectionEnabled() { + return clientContext.isDeadNodeDetectionEnabled(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 757924d8a7..73237973e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -171,10 +171,26 @@ public class DFSInputStream extends FSInputStream private byte[] oneByteBuf; // used for 'int read()' - void addToDeadNodes(DatanodeInfo dnInfo) { + protected void addToLocalDeadNodes(DatanodeInfo dnInfo) { deadNodes.put(dnInfo, dnInfo); } + protected void removeFromLocalDeadNodes(DatanodeInfo dnInfo) { + deadNodes.remove(dnInfo); + } + + protected ConcurrentHashMap getLocalDeadNodes() { + return deadNodes; + } + + private void clearLocalDeadNodes() { + deadNodes.clear(); + } + + protected DFSClient getDFSClient() { + return dfsClient; + } + DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, LocatedBlocks locatedBlocks) throws IOException { this.dfsClient = dfsClient; @@ -612,7 +628,8 @@ private synchronized DatanodeInfo blockSeekTo(long target) + "{}, add to deadNodes and continue. ", targetAddr, src, targetBlock.getBlock(), ex); // Put chosen node into dead list, continue - addToDeadNodes(chosenNode); + addToLocalDeadNodes(chosenNode); + dfsClient.addNodeToDeadNodeDetector(this, chosenNode); } } } @@ -663,28 +680,40 @@ protected BlockReader getBlockReader(LocatedBlock targetBlock, */ @Override public synchronized void close() throws IOException { - if (!closed.compareAndSet(false, true)) { - DFSClient.LOG.debug("DFSInputStream has been closed already"); - return; - } - dfsClient.checkOpen(); + try { + if (!closed.compareAndSet(false, true)) { + DFSClient.LOG.debug("DFSInputStream has been closed already"); + return; + } + dfsClient.checkOpen(); - if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) { - final StringBuilder builder = new StringBuilder(); - extendedReadBuffers.visitAll(new IdentityHashStore.Visitor() { - private String prefix = ""; - @Override - public void accept(ByteBuffer k, Object v) { - builder.append(prefix).append(k); - prefix = ", "; - } - }); - DFSClient.LOG.warn("closing file " + src + ", but there are still " + - "unreleased ByteBuffers allocated by read(). " + - "Please release " + builder.toString() + "."); + if ((extendedReadBuffers != null) && (!extendedReadBuffers.isEmpty())) { + final StringBuilder builder = new StringBuilder(); + extendedReadBuffers + .visitAll(new IdentityHashStore.Visitor() { + private String prefix = ""; + + @Override + public void accept(ByteBuffer k, Object v) { + builder.append(prefix).append(k); + prefix = ", "; + } + }); + DFSClient.LOG.warn("closing file " + src + ", but there are still " + + "unreleased ByteBuffers allocated by read(). " + + "Please release " + builder.toString() + "."); + } + closeCurrentBlockReaders(); + super.close(); + } finally { + /** + * If dfsInputStream is closed and datanode is in + * DeadNodeDetector#dfsInputStreamNodes, we need remove the datanode from + * the DeadNodeDetector#dfsInputStreamNodes. Since user should not use + * this dfsInputStream anymore. + */ + dfsClient.removeNodeFromDeadNodeDetector(this, locatedBlocks); } - closeCurrentBlockReaders(); - super.close(); } @Override @@ -741,7 +770,8 @@ private synchronized int readBuffer(ReaderStrategy reader, int len, */ sourceFound = seekToBlockSource(pos); } else { - addToDeadNodes(currentNode); + addToLocalDeadNodes(currentNode); + dfsClient.addNodeToDeadNodeDetector(this, currentNode); sourceFound = seekToNewSource(pos); } if (!sourceFound) { @@ -801,7 +831,8 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy) } blockEnd = -1; if (currentNode != null) { - addToDeadNodes(currentNode); + addToLocalDeadNodes(currentNode); + dfsClient.addNodeToDeadNodeDetector(this, currentNode); } if (--retries == 0) { throw e; @@ -883,7 +914,7 @@ private DNAddrPair chooseDataNode(LocatedBlock block, private LocatedBlock refetchLocations(LocatedBlock block, Collection ignoredNodes) throws IOException { String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), - deadNodes, ignoredNodes); + dfsClient.getDeadNodes(this), ignoredNodes); String blockInfo = block.getBlock() + " file=" + src; if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) { String description = "Could not obtain block: " + blockInfo; @@ -924,7 +955,7 @@ private LocatedBlock refetchLocations(LocatedBlock block, throw new InterruptedIOException( "Interrupted while choosing DataNode for read."); } - deadNodes.clear(); //2nd option is to remove only nodes[blockId] + clearLocalDeadNodes(); //2nd option is to remove only nodes[blockId] openInfo(true); block = refreshLocatedBlock(block); failures++; @@ -945,7 +976,7 @@ protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block, StorageType storageType = null; if (nodes != null) { for (int i = 0; i < nodes.length; i++) { - if (!deadNodes.containsKey(nodes[i]) + if (!dfsClient.getDeadNodes(this).containsKey(nodes[i]) && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) { chosenNode = nodes[i]; // Storage types are ordered to correspond with nodes, so use the same @@ -1097,7 +1128,7 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk, DFSClient.LOG.warn(msg); // we want to remember what we have tried corruptedBlocks.addCorruptedBlock(block.getBlock(), datanode.info); - addToDeadNodes(datanode.info); + addToLocalDeadNodes(datanode.info); throw new IOException(msg); } catch (IOException e) { checkInterrupted(e); @@ -1119,7 +1150,8 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk, String msg = "Failed to connect to " + datanode.addr + " for file " + src + " for block " + block.getBlock() + ":" + e; DFSClient.LOG.warn("Connection failure: " + msg, e); - addToDeadNodes(datanode.info); + addToLocalDeadNodes(datanode.info); + dfsClient.addNodeToDeadNodeDetector(this, datanode.info); throw new IOException(msg); } // Refresh the block for updated tokens in case of token failures or @@ -1522,14 +1554,14 @@ public synchronized boolean seekToNewSource(long targetPos) if (currentNode == null) { return seekToBlockSource(targetPos); } - boolean markedDead = deadNodes.containsKey(currentNode); - addToDeadNodes(currentNode); + boolean markedDead = dfsClient.isDeadNode(this, currentNode); + addToLocalDeadNodes(currentNode); DatanodeInfo oldNode = currentNode; DatanodeInfo newNode = blockSeekTo(targetPos); if (!markedDead) { /* remove it from deadNodes. blockSeekTo could have cleared * deadNodes and added currentNode again. Thats ok. */ - deadNodes.remove(oldNode); + removeFromLocalDeadNodes(oldNode); } if (!oldNode.getDatanodeUuid().equals(newNode.getDatanodeUuid())) { currentNode = newNode; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index cf29791fd4..ba35d51561 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -147,10 +147,6 @@ protected String getSrc() { return src; } - protected DFSClient getDFSClient() { - return dfsClient; - } - protected LocatedBlocks getLocatedBlocks() { return locatedBlocks; } @@ -283,7 +279,7 @@ boolean createBlockReader(LocatedBlock block, long offsetInBlock, "block" + block.getBlock(), e); // re-fetch the block in case the block has been moved fetchBlockAt(block.getStartOffset()); - addToDeadNodes(dnInfo.info); + addToLocalDeadNodes(dnInfo.info); } } if (reader != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java new file mode 100644 index 0000000000..1ac29a74e4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DeadNodeDetector.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Detect the dead nodes in advance, and share this information among all the + * DFSInputStreams in the same client. + */ +public class DeadNodeDetector implements Runnable { + public static final Logger LOG = + LoggerFactory.getLogger(DeadNodeDetector.class); + + /** + * Waiting time when DeadNodeDetector happens error. + */ + private static final long ERROR_SLEEP_MS = 5000; + + /** + * Waiting time when DeadNodeDetector's state is idle. + */ + private static final long IDLE_SLEEP_MS = 10000; + + /** + * Client context name. + */ + private String name; + + /** + * Dead nodes shared by all the DFSInputStreams of the client. + */ + private final ConcurrentHashMap deadNodes; + + /** + * Record dead nodes by one DFSInputStream. When dead node is not used by one + * DFSInputStream, remove it from dfsInputStreamNodes#DFSInputStream. If + * DFSInputStream does not include any dead node, remove DFSInputStream from + * dfsInputStreamNodes. + */ + private final ConcurrentHashMap> + dfsInputStreamNodes; + + /** + * The state of DeadNodeDetector. + */ + private enum State { + INIT, CHECK_DEAD, IDLE, ERROR + } + + private State state; + + public DeadNodeDetector(String name) { + this.deadNodes = new ConcurrentHashMap(); + this.dfsInputStreamNodes = + new ConcurrentHashMap>(); + this.name = name; + + LOG.info("Start dead node detector for DFSClient {}.", this.name); + state = State.INIT; + } + + @Override + public void run() { + while (true) { + clearAndGetDetectedDeadNodes(); + LOG.debug("Current detector state {}, the detected nodes: {}.", state, + deadNodes.values()); + switch (state) { + case INIT: + init(); + break; + case IDLE: + idle(); + break; + case ERROR: + try { + Thread.sleep(ERROR_SLEEP_MS); + } catch (InterruptedException e) { + } + return; + default: + break; + } + } + } + + private void idle() { + try { + Thread.sleep(IDLE_SLEEP_MS); + } catch (InterruptedException e) { + + } + + state = State.IDLE; + } + + private void init() { + state = State.IDLE; + } + + private void addToDead(DatanodeInfo datanodeInfo) { + deadNodes.put(datanodeInfo.getDatanodeUuid(), datanodeInfo); + } + + public boolean isDeadNode(DatanodeInfo datanodeInfo) { + return deadNodes.containsKey(datanodeInfo.getDatanodeUuid()); + } + + /** + * Add datanode in deadNodes and dfsInputStreamNodes. The node is considered + * to dead node. The dead node is shared by all the DFSInputStreams in the + * same client. + */ + public synchronized void addNodeToDetect(DFSInputStream dfsInputStream, + DatanodeInfo datanodeInfo) { + HashSet datanodeInfos = + dfsInputStreamNodes.get(dfsInputStream); + if (datanodeInfos == null) { + datanodeInfos = new HashSet(); + datanodeInfos.add(datanodeInfo); + dfsInputStreamNodes.putIfAbsent(dfsInputStream, datanodeInfos); + } else { + datanodeInfos.add(datanodeInfo); + } + + addToDead(datanodeInfo); + } + + /** + * Remove dead node which is not used by any DFSInputStream from deadNodes. + * @return new dead node shared by all DFSInputStreams. + */ + public synchronized Set clearAndGetDetectedDeadNodes() { + // remove the dead nodes who doesn't have any inputstream first + Set newDeadNodes = new HashSet(); + for (HashSet datanodeInfos : dfsInputStreamNodes.values()) { + newDeadNodes.addAll(datanodeInfos); + } + + for (DatanodeInfo datanodeInfo : deadNodes.values()) { + if (!newDeadNodes.contains(datanodeInfo)) { + deadNodes.remove(datanodeInfo.getDatanodeUuid()); + } + } + return new HashSet<>(deadNodes.values()); + } + + /** + * Remove dead node from dfsInputStreamNodes#dfsInputStream. If + * dfsInputStreamNodes#dfsInputStream does not contain any dead node, remove + * it from dfsInputStreamNodes. + */ + public synchronized void removeNodeFromDeadNodeDetector( + DFSInputStream dfsInputStream, DatanodeInfo datanodeInfo) { + Set datanodeInfos = dfsInputStreamNodes.get(dfsInputStream); + if (datanodeInfos != null) { + datanodeInfos.remove(datanodeInfo); + if (datanodeInfos.isEmpty()) { + dfsInputStreamNodes.remove(dfsInputStream); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index d1ca63d8ab..e32a3bb40f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -152,6 +152,10 @@ public interface HdfsClientConfigKeys { "dfs.client.block.reader.remote.buffer.size"; int DFS_CLIENT_BLOCK_READER_REMOTE_BUFFER_SIZE_DEFAULT = 8192; + String DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY = + "dfs.client.deadnode.detection.enabled"; + boolean DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT = false; + String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = "dfs.datanode.kerberos.principal"; String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 04bdfe4796..804375ccff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -26,14 +26,19 @@ import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.ReplicaAccessorBuilder; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.util.DataChecksum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY; @@ -47,6 +52,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_KEY_PROVIDER_CACHE_EXPIRY_DEFAULT; @@ -61,8 +68,6 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIRY_MSEC_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_SEND_BUFFER_SIZE_KEY; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT; -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; @@ -71,6 +76,8 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_DISABLE_INTERVAL_SECOND_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DOMAIN_SOCKET_PATH_DEFAULT; @@ -87,11 +94,6 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ShortCircuit; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; - /** * DFSClient configuration. */ @@ -145,6 +147,8 @@ public class DfsClientConf { private final boolean dataTransferTcpNoDelay; + private final boolean deadNodeDetectionEnabled; + public DfsClientConf(Configuration conf) { // The hdfsTimeout is currently the same as the ipc timeout hdfsTimeout = Client.getRpcTimeout(conf); @@ -262,6 +266,10 @@ public DfsClientConf(Configuration conf) { HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT); + deadNodeDetectionEnabled = + conf.getBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, + DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_DEFAULT); + stripedReadThreadpoolSize = conf.getInt( HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY, HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_DEFAULT); @@ -595,6 +603,13 @@ public int getStripedReadThreadpoolSize() { return stripedReadThreadpoolSize; } + /** + * @return the deadNodeDetectionEnabled + */ + public boolean isDeadNodeDetectionEnabled() { + return deadNodeDetectionEnabled; + } + /** * @return the replicaAccessorBuilderClasses */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 7effbd0faa..d2103fbc10 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2987,6 +2987,15 @@ + + dfs.client.deadnode.detection.enabled + false + + Set to true to enable dead node detection in client side. Then all the DFSInputStreams of the same client can + share the dead node information. + + + dfs.namenode.lease-recheck-interval-ms 2000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java new file mode 100644 index 0000000000..9b997ab732 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDeadNodeDetection.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY; +import static org.junit.Assert.assertEquals; + +/** + * Tests for dead node detection in DFSClient. + */ +public class TestDeadNodeDetection { + + private MiniDFSCluster cluster; + private Configuration conf; + + @Before + public void setUp() { + cluster = null; + conf = new HdfsConfiguration(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testDeadNodeDetectionInBackground() throws IOException { + conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true); + // We'll be using a 512 bytes block size just for tests + // so making sure the checksum bytes too match it. + conf.setInt("io.bytes.per.checksum", 512); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + cluster.waitActive(); + + FileSystem fs = cluster.getFileSystem(); + Path filePath = new Path("/testDetectDeadNodeInBackground"); + + // 256 bytes data chunk for writes + byte[] bytes = new byte[256]; + for (int index = 0; index < bytes.length; index++) { + bytes[index] = '0'; + } + + // File with a 512 bytes block size + FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512); + + // Write a block to all 3 DNs (2x256bytes). + out.write(bytes); + out.write(bytes); + out.hflush(); + out.close(); + + // Remove three DNs, + cluster.stopDataNode(0); + cluster.stopDataNode(0); + cluster.stopDataNode(0); + + FSDataInputStream in = fs.open(filePath); + DFSInputStream din = null; + DFSClient dfsClient = null; + try { + try { + in.read(); + } catch (BlockMissingException e) { + } + + din = (DFSInputStream) in.getWrappedStream(); + dfsClient = din.getDFSClient(); + assertEquals(3, dfsClient.getDeadNodes(din).size()); + assertEquals(3, dfsClient.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + } finally { + in.close(); + fs.delete(new Path("/testDetectDeadNodeInBackground"), true); + // check the dead node again here, the dead node is expected be removed + assertEquals(0, dfsClient.getDeadNodes(din).size()); + assertEquals(0, dfsClient.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + } + } + + @Test + public void testDeadNodeDetectionInMultipleDFSInputStream() + throws IOException { + conf.setBoolean(DFS_CLIENT_DEAD_NODE_DETECTION_ENABLED_KEY, true); + // We'll be using a 512 bytes block size just for tests + // so making sure the checksum bytes too match it. + conf.setInt("io.bytes.per.checksum", 512); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + + FileSystem fs = cluster.getFileSystem(); + Path filePath = new Path("/testDeadNodeMultipleDFSInputStream"); + + // 256 bytes data chunk for writes + byte[] bytes = new byte[256]; + for (int index = 0; index < bytes.length; index++) { + bytes[index] = '0'; + } + + // File with a 512 bytes block size + FSDataOutputStream out = fs.create(filePath, true, 4096, (short) 3, 512); + + // Write a block to DN (2x256bytes). + out.write(bytes); + out.write(bytes); + out.hflush(); + out.close(); + + String datanodeUuid = cluster.getDataNodes().get(0).getDatanodeUuid(); + FSDataInputStream in1 = fs.open(filePath); + DFSInputStream din1 = (DFSInputStream) in1.getWrappedStream(); + DFSClient dfsClient1 = din1.getDFSClient(); + cluster.stopDataNode(0); + + FSDataInputStream in2 = fs.open(filePath); + DFSInputStream din2 = null; + DFSClient dfsClient2 = null; + try { + try { + in1.read(); + } catch (BlockMissingException e) { + } + + din2 = (DFSInputStream) in1.getWrappedStream(); + dfsClient2 = din2.getDFSClient(); + assertEquals(1, dfsClient1.getDeadNodes(din1).size()); + assertEquals(1, dfsClient2.getDeadNodes(din2).size()); + assertEquals(1, dfsClient1.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + assertEquals(1, dfsClient2.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + // check the dn uuid of dead node to see if its expected dead node + assertEquals(datanodeUuid, + ((DatanodeInfo) dfsClient1.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().toArray()[0]).getDatanodeUuid()); + assertEquals(datanodeUuid, + ((DatanodeInfo) dfsClient2.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().toArray()[0]).getDatanodeUuid()); + } finally { + in1.close(); + in2.close(); + fs.delete(new Path("/testDeadNodeMultipleDFSInputStream"), true); + // check the dead node again here, the dead node is expected be removed + assertEquals(0, dfsClient1.getDeadNodes(din1).size()); + assertEquals(0, dfsClient2.getDeadNodes(din2).size()); + assertEquals(0, dfsClient1.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + assertEquals(0, dfsClient2.getClientContext().getDeadNodeDetector() + .clearAndGetDetectedDeadNodes().size()); + } + } +}