HDFS-6569. OOB message can't be sent to the client when DataNode shuts down for upgrade. Contributed by Brandon Li
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1618742 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1b5139b29a
commit
2fb04d2a30
@ -517,6 +517,9 @@ Release 2.6.0 - UNRELEASED
|
||||
HDFS-6825. Edit log corruption due to delayed block removal.
|
||||
(Yongjun Zhang via wang)
|
||||
|
||||
HDFS-6569. OOB message can't be sent to the client when DataNode shuts down for upgrade
|
||||
(brandonli)
|
||||
|
||||
Release 2.5.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -738,7 +738,12 @@ class BlockReceiver implements Closeable {
|
||||
LOG.warn("Error managing cache for writer of block " + block, t);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void sendOOB() throws IOException, InterruptedException {
|
||||
((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
|
||||
.getRestartOOBStatus());
|
||||
}
|
||||
|
||||
void receiveBlock(
|
||||
DataOutputStream mirrOut, // output to next datanode
|
||||
DataInputStream mirrIn, // input from next datanode
|
||||
@ -830,9 +835,7 @@ class BlockReceiver implements Closeable {
|
||||
// The worst case is not recovering this RBW replica.
|
||||
// Client will fall back to regular pipeline recovery.
|
||||
}
|
||||
try {
|
||||
((PacketResponder) responder.getRunnable()).
|
||||
sendOOBResponse(PipelineAck.getRestartOOBStatus());
|
||||
try {
|
||||
// Even if the connection is closed after the ack packet is
|
||||
// flushed, the client can react to the connection closure
|
||||
// first. Insert a delay to lower the chance of client
|
||||
@ -840,8 +843,6 @@ class BlockReceiver implements Closeable {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ie) {
|
||||
// It is already going down. Ignore this.
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Error sending OOB Ack.", ioe);
|
||||
}
|
||||
}
|
||||
responder.interrupt();
|
||||
|
@ -270,6 +270,7 @@ public class DataNode extends Configured
|
||||
public final static String EMPTY_DEL_HINT = "";
|
||||
final AtomicInteger xmitsInProgress = new AtomicInteger();
|
||||
Daemon dataXceiverServer = null;
|
||||
DataXceiverServer xserver = null;
|
||||
Daemon localDataXceiverServer = null;
|
||||
ShortCircuitRegistry shortCircuitRegistry = null;
|
||||
ThreadGroup threadGroup = null;
|
||||
@ -649,8 +650,8 @@ public class DataNode extends Configured
|
||||
streamingAddr = tcpPeerServer.getStreamingAddr();
|
||||
LOG.info("Opened streaming server at " + streamingAddr);
|
||||
this.threadGroup = new ThreadGroup("dataXceiverServer");
|
||||
this.dataXceiverServer = new Daemon(threadGroup,
|
||||
new DataXceiverServer(tcpPeerServer, conf, this));
|
||||
xserver = new DataXceiverServer(tcpPeerServer, conf, this);
|
||||
this.dataXceiverServer = new Daemon(threadGroup, xserver);
|
||||
this.threadGroup.setDaemon(true); // auto destroy when empty
|
||||
|
||||
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
|
||||
@ -1137,6 +1138,11 @@ public class DataNode extends Configured
|
||||
dataNodeInfoBeanName = MBeans.register("DataNode", "DataNodeInfo", this);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public DataXceiverServer getXferServer() {
|
||||
return xserver;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public int getXferPort() {
|
||||
return streamingAddr.getPort();
|
||||
@ -1395,6 +1401,7 @@ public class DataNode extends Configured
|
||||
// in order to avoid any further acceptance of requests, but the peers
|
||||
// for block writes are not closed until the clients are notified.
|
||||
if (dataXceiverServer != null) {
|
||||
xserver.sendOOBToPeers();
|
||||
((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
|
||||
this.dataXceiverServer.interrupt();
|
||||
}
|
||||
|
@ -103,7 +103,8 @@ class DataXceiver extends Receiver implements Runnable {
|
||||
private long opStartTime; //the start time of receiving an Op
|
||||
private final InputStream socketIn;
|
||||
private OutputStream socketOut;
|
||||
|
||||
private BlockReceiver blockReceiver = null;
|
||||
|
||||
/**
|
||||
* Client Name used in previous operation. Not available on first request
|
||||
* on the socket.
|
||||
@ -159,6 +160,12 @@ class DataXceiver extends Receiver implements Runnable {
|
||||
return socketOut;
|
||||
}
|
||||
|
||||
public void sendOOB() throws IOException, InterruptedException {
|
||||
LOG.info("Sending OOB to peer: " + peer);
|
||||
if(blockReceiver!=null)
|
||||
blockReceiver.sendOOB();
|
||||
}
|
||||
|
||||
/**
|
||||
* Read/write data from/to the DataXceiverServer.
|
||||
*/
|
||||
@ -168,7 +175,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||
Op op = null;
|
||||
|
||||
try {
|
||||
dataXceiverServer.addPeer(peer, Thread.currentThread());
|
||||
dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
|
||||
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
|
||||
InputStream input = socketIn;
|
||||
IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
|
||||
@ -584,7 +591,6 @@ class DataXceiver extends Receiver implements Runnable {
|
||||
DataOutputStream mirrorOut = null; // stream to next target
|
||||
DataInputStream mirrorIn = null; // reply from next target
|
||||
Socket mirrorSock = null; // socket to next target
|
||||
BlockReceiver blockReceiver = null; // responsible for data handling
|
||||
String mirrorNode = null; // the name:port of next target
|
||||
String firstBadLink = ""; // first datanode that failed in connection setup
|
||||
Status mirrorInStatus = SUCCESS;
|
||||
@ -747,6 +753,7 @@ class DataXceiver extends Receiver implements Runnable {
|
||||
IOUtils.closeStream(replyOut);
|
||||
IOUtils.closeSocket(mirrorSock);
|
||||
IOUtils.closeStream(blockReceiver);
|
||||
blockReceiver = null;
|
||||
}
|
||||
|
||||
//update metrics
|
||||
|
@ -27,11 +27,11 @@ import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.net.Peer;
|
||||
import org.apache.hadoop.hdfs.net.PeerServer;
|
||||
import org.apache.hadoop.hdfs.server.balancer.Balancer;
|
||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Server used for receiving/sending a block of data.
|
||||
@ -45,6 +45,7 @@ class DataXceiverServer implements Runnable {
|
||||
private final PeerServer peerServer;
|
||||
private final DataNode datanode;
|
||||
private final HashMap<Peer, Thread> peers = new HashMap<Peer, Thread>();
|
||||
private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap<Peer, DataXceiver>();
|
||||
private boolean closed = false;
|
||||
|
||||
/**
|
||||
@ -217,18 +218,38 @@ class DataXceiverServer implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void addPeer(Peer peer, Thread t) throws IOException {
|
||||
synchronized void addPeer(Peer peer, Thread t, DataXceiver xceiver)
|
||||
throws IOException {
|
||||
if (closed) {
|
||||
throw new IOException("Server closed.");
|
||||
}
|
||||
peers.put(peer, t);
|
||||
peersXceiver.put(peer, xceiver);
|
||||
}
|
||||
|
||||
synchronized void closePeer(Peer peer) {
|
||||
peers.remove(peer);
|
||||
peersXceiver.remove(peer);
|
||||
IOUtils.cleanup(null, peer);
|
||||
}
|
||||
|
||||
// Sending OOB to all peers
|
||||
public synchronized void sendOOBToPeers() {
|
||||
if (!datanode.shutdownForUpgrade) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (Peer p : peers.keySet()) {
|
||||
try {
|
||||
peersXceiver.get(p).sendOOB();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Got error when sending OOB message.", e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Interrupted when sending OOB message.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Notify all peers of the shutdown and restart.
|
||||
// datanode.shouldRun should still be true and datanode.restarting should
|
||||
// be set true before calling this method.
|
||||
@ -247,6 +268,7 @@ class DataXceiverServer implements Runnable {
|
||||
IOUtils.cleanup(LOG, p);
|
||||
}
|
||||
peers.clear();
|
||||
peersXceiver.clear();
|
||||
}
|
||||
|
||||
// Return the number of peers.
|
||||
@ -254,7 +276,14 @@ class DataXceiverServer implements Runnable {
|
||||
return peers.size();
|
||||
}
|
||||
|
||||
// Return the number of peers and DataXceivers.
|
||||
@VisibleForTesting
|
||||
synchronized int getNumPeersXceiver() {
|
||||
return peersXceiver.size();
|
||||
}
|
||||
|
||||
synchronized void releasePeer(Peer peer) {
|
||||
peers.remove(peer);
|
||||
peersXceiver.remove(peer);
|
||||
}
|
||||
}
|
||||
|
@ -27,11 +27,14 @@ import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
@ -67,6 +70,7 @@ public class TestDataNodeRollingUpgrade {
|
||||
|
||||
private void startCluster() throws IOException {
|
||||
conf = new HdfsConfiguration();
|
||||
conf.setInt("dfs.blocksize", 1024*1024);
|
||||
cluster = new Builder(conf).numDataNodes(REPL_FACTOR).build();
|
||||
cluster.waitActive();
|
||||
fs = cluster.getFileSystem();
|
||||
@ -243,4 +247,48 @@ public class TestDataNodeRollingUpgrade {
|
||||
shutdownCluster();
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout=600000)
|
||||
// Test DatanodeXceiver has correct peer-dataxceiver pairs for sending OOB message
|
||||
public void testDatanodePeersXceiver() throws Exception {
|
||||
try {
|
||||
startCluster();
|
||||
|
||||
// Create files in DFS.
|
||||
String testFile1 = "/TestDataNodeXceiver1.dat";
|
||||
String testFile2 = "/TestDataNodeXceiver2.dat";
|
||||
String testFile3 = "/TestDataNodeXceiver3.dat";
|
||||
|
||||
DFSClient client1 = new DFSClient(NameNode.getAddress(conf), conf);
|
||||
DFSClient client2 = new DFSClient(NameNode.getAddress(conf), conf);
|
||||
DFSClient client3 = new DFSClient(NameNode.getAddress(conf), conf);
|
||||
|
||||
DFSOutputStream s1 = (DFSOutputStream) client1.create(testFile1, true);
|
||||
DFSOutputStream s2 = (DFSOutputStream) client2.create(testFile2, true);
|
||||
DFSOutputStream s3 = (DFSOutputStream) client3.create(testFile3, true);
|
||||
|
||||
byte[] toWrite = new byte[1024*1024*8];
|
||||
Random rb = new Random(1111);
|
||||
rb.nextBytes(toWrite);
|
||||
s1.write(toWrite, 0, 1024*1024*8);
|
||||
s1.flush();
|
||||
s2.write(toWrite, 0, 1024*1024*8);
|
||||
s2.flush();
|
||||
s3.write(toWrite, 0, 1024*1024*8);
|
||||
s3.flush();
|
||||
|
||||
assertTrue(dn.getXferServer().getNumPeersXceiver() == dn.getXferServer()
|
||||
.getNumPeersXceiver());
|
||||
s1.close();
|
||||
s2.close();
|
||||
s3.close();
|
||||
assertTrue(dn.getXferServer().getNumPeersXceiver() == dn.getXferServer()
|
||||
.getNumPeersXceiver());
|
||||
client1.close();
|
||||
client2.close();
|
||||
client3.close();
|
||||
} finally {
|
||||
shutdownCluster();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user