diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d1ed85182e..685cd6211d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -500,9 +500,6 @@ Release 2.6.0 - UNRELEASED HDFS-6830. BlockInfo.addStorage fails when DN changes the storage for a block replica (Arpit Agarwal) - HDFS-6847. Avoid timeouts for replaceBlock() call by sending intermediate - responses to Balancer (vinayakumarb) - Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index b7dceb6f48..4a6f96be7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -87,6 +87,8 @@ public class Dispatcher { private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5; private static final long DELAY_AFTER_ERROR = 10 * 1000L; // 10 seconds + private static final int BLOCK_MOVE_READ_TIMEOUT = 20 * 60 * 1000; // 20 + // minutes private final NameNodeConnector nnc; private final SaslDataTransferClient saslClient; @@ -276,6 +278,13 @@ private void dispatch() { sock.connect( NetUtils.createSocketAddr(target.getDatanodeInfo().getXferAddr()), HdfsServerConstants.READ_TIMEOUT); + /* + * Unfortunately we don't have a good way to know if the Datanode is + * taking a really long time to move a block, OR something has gone + * wrong and it's never going to finish. To deal with this scenario, we + * set a long timeout (20 minutes) to avoid hanging indefinitely. + */ + sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT); sock.setKeepAlive(true); @@ -332,12 +341,8 @@ private void sendRequest(DataOutputStream out, ExtendedBlock eb, /** Receive a block copy response from the input stream */ private void receiveResponse(DataInputStream in) throws IOException { - BlockOpResponseProto response = - BlockOpResponseProto.parseFrom(vintPrefixed(in)); - while (response.getStatus() == Status.IN_PROGRESS) { - // read intermediate responses - response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); - } + BlockOpResponseProto response = BlockOpResponseProto + .parseFrom(vintPrefixed(in)); if (response.getStatus() != Status.SUCCESS) { if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) { throw new IOException("block move failed due to access token error"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 9dcd006ed5..fb7ecd69e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -45,7 +45,6 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; @@ -124,14 +123,6 @@ class BlockReceiver implements Closeable { private boolean syncOnClose; private long restartBudget; - /** - * for replaceBlock response - */ - private final long responseInterval; - private long lastResponseTime = 0; - private boolean isReplaceBlock = false; - private DataOutputStream replyOut = null; - BlockReceiver(final ExtendedBlock block, final StorageType storageType, final DataInputStream in, final String inAddr, final String myAddr, @@ -153,9 +144,6 @@ class BlockReceiver implements Closeable { this.isClient = !this.isDatanode; this.restartBudget = datanode.getDnConf().restartReplicaExpiry; this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs; - // For replaceBlock() calls response should be sent to avoid socketTimeout - // at clients. So sending with the interval of 0.5 * socketTimeout - this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5); //for datanode, we have //1: clientName.length() == 0, and //2: stage == null or PIPELINE_SETUP_CREATE @@ -663,20 +651,6 @@ private int receivePacket() throws IOException { lastPacketInBlock, offsetInBlock, Status.SUCCESS); } - /* - * Send in-progress responses for the replaceBlock() calls back to caller to - * avoid timeouts due to balancer throttling. HDFS-6247 - */ - if (isReplaceBlock - && (Time.monotonicNow() - lastResponseTime > responseInterval)) { - BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder() - .setStatus(Status.IN_PROGRESS); - response.build().writeDelimitedTo(replyOut); - replyOut.flush(); - - lastResponseTime = Time.monotonicNow(); - } - if (throttler != null) { // throttle I/O throttler.throttle(len); } @@ -744,8 +718,7 @@ void receiveBlock( DataInputStream mirrIn, // input from next datanode DataOutputStream replyOut, // output to previous datanode String mirrAddr, DataTransferThrottler throttlerArg, - DatanodeInfo[] downstreams, - boolean isReplaceBlock) throws IOException { + DatanodeInfo[] downstreams) throws IOException { syncOnClose = datanode.getDnConf().syncOnClose; boolean responderClosed = false; @@ -753,9 +726,6 @@ void receiveBlock( mirrorAddr = mirrAddr; throttler = throttlerArg; - this.replyOut = replyOut; - this.isReplaceBlock = isReplaceBlock; - try { if (isClient && !isTransfer) { responder = new Daemon(datanode.threadGroup, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index 01fe036f1d..5ef6cc7ee2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -708,7 +708,7 @@ public void writeBlock(final ExtendedBlock block, if (blockReceiver != null) { String mirrorAddr = (mirrorSock == null) ? null : mirrorNode; blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, - mirrorAddr, null, targets, false); + mirrorAddr, null, targets); // send close-ack for transfer-RBW/Finalized if (isTransfer) { @@ -983,7 +983,7 @@ public void replaceBlock(final ExtendedBlock block, String errMsg = null; BlockReceiver blockReceiver = null; DataInputStream proxyReply = null; - DataOutputStream replyOut = new DataOutputStream(getOutputStream()); + try { // get the output stream to the proxy final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname); @@ -1040,8 +1040,8 @@ public void replaceBlock(final ExtendedBlock block, CachingStrategy.newDropBehind()); // receive a block - blockReceiver.receiveBlock(null, null, replyOut, null, - dataXceiverServer.balanceThrottler, null, true); + blockReceiver.receiveBlock(null, null, null, null, + dataXceiverServer.balanceThrottler, null); // notify name node datanode.notifyNamenodeReceivedBlock( @@ -1076,7 +1076,6 @@ public void replaceBlock(final ExtendedBlock block, IOUtils.closeStream(proxyOut); IOUtils.closeStream(blockReceiver); IOUtils.closeStream(proxyReply); - IOUtils.closeStream(replyOut); } //update metrics diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index 6283b569dd..9b4ba339d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -207,7 +207,6 @@ enum Status { OOB_RESERVED1 = 9; // Reserved OOB_RESERVED2 = 10; // Reserved OOB_RESERVED3 = 11; // Reserved - IN_PROGRESS = 12; } message PipelineAckProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java index e0d79648a8..478b6d1546 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java @@ -272,10 +272,8 @@ private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source, // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); - BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(reply); - while (proto.getStatus() == Status.IN_PROGRESS) { - proto = BlockOpResponseProto.parseDelimitedFrom(reply); - } + BlockOpResponseProto proto = + BlockOpResponseProto.parseDelimitedFrom(reply); return proto.getStatus() == Status.SUCCESS; }