HADOOP-7346. Send back nicer error message to clients using outdated IPC version. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1131254 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1e8c213e18
commit
90a9262611
@ -512,6 +512,9 @@ Release 0.22.0 - Unreleased
|
||||
HADOOP-7355 Add audience and stability annotations to HttpServer class
|
||||
(stack)
|
||||
|
||||
HADOOP-7346. Send back nicer error message to clients using outdated IPC
|
||||
version. (todd)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-6884. Add LOG.isDebugEnabled() guard for each LOG.debug(..).
|
||||
|
@ -67,6 +67,7 @@
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import org.apache.hadoop.ipc.RPC.VersionMismatch;
|
||||
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
|
||||
import org.apache.hadoop.ipc.metrics.RpcMetrics;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
@ -1172,6 +1173,7 @@ public int readAndProcess() throws IOException, InterruptedException {
|
||||
hostAddress + ":" + remotePort +
|
||||
" got version " + version +
|
||||
" expected version " + CURRENT_VERSION);
|
||||
setupBadVersionResponse(version);
|
||||
return -1;
|
||||
}
|
||||
dataLengthBuffer.clear();
|
||||
@ -1246,6 +1248,40 @@ public int readAndProcess() throws IOException, InterruptedException {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to set up the response to indicate that the client version
|
||||
* is incompatible with the server. This can contain special-case
|
||||
* code to speak enough of past IPC protocols to pass back
|
||||
* an exception to the caller.
|
||||
* @param clientVersion the version the caller is using
|
||||
* @throws IOException
|
||||
*/
|
||||
private void setupBadVersionResponse(int clientVersion) throws IOException {
|
||||
String errMsg = "Server IPC version " + CURRENT_VERSION +
|
||||
" cannot communicate with client version " + clientVersion;
|
||||
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
|
||||
|
||||
if (clientVersion >= 3) {
|
||||
Call fakeCall = new Call(-1, null, this);
|
||||
// Versions 3 and greater can interpret this exception
|
||||
// response in the same manner
|
||||
setupResponse(buffer, fakeCall, Status.FATAL,
|
||||
null, VersionMismatch.class.getName(), errMsg);
|
||||
|
||||
responder.doRespond(fakeCall);
|
||||
} else if (clientVersion == 2) { // Hadoop 0.18.3
|
||||
Call fakeCall = new Call(0, null, this);
|
||||
DataOutputStream out = new DataOutputStream(buffer);
|
||||
out.writeInt(0); // call ID
|
||||
out.writeBoolean(true); // error
|
||||
WritableUtils.writeString(out, VersionMismatch.class.getName());
|
||||
WritableUtils.writeString(out, errMsg);
|
||||
fakeCall.setResponse(ByteBuffer.wrap(buffer.toByteArray()));
|
||||
|
||||
responder.doRespond(fakeCall);
|
||||
}
|
||||
}
|
||||
|
||||
/// Reads the connection header following version
|
||||
private void processHeader(byte[] buf) throws IOException {
|
||||
DataInputStream in =
|
||||
|
@ -20,17 +20,22 @@
|
||||
|
||||
import org.apache.commons.logging.*;
|
||||
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
|
||||
import java.util.Random;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInput;
|
||||
import java.io.File;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketTimeoutException;
|
||||
import javax.net.SocketFactory;
|
||||
|
||||
@ -41,6 +46,9 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.junit.Assume;
|
||||
|
||||
import com.google.common.primitives.Bytes;
|
||||
import com.google.common.primitives.Ints;
|
||||
|
||||
/** Unit tests for IPC. */
|
||||
public class TestIPC {
|
||||
public static final Log LOG =
|
||||
@ -507,6 +515,183 @@ private long countOpenFileDescriptors() {
|
||||
return FD_DIR.list().length;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIpcFromHadoop_0_18_13() throws Exception {
|
||||
doIpcVersionTest(NetworkTraces.HADOOP_0_18_3_RPC_DUMP,
|
||||
NetworkTraces.RESPONSE_TO_HADOOP_0_18_3_RPC);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIpcFromHadoop0_20_3() throws Exception {
|
||||
doIpcVersionTest(NetworkTraces.HADOOP_0_20_3_RPC_DUMP,
|
||||
NetworkTraces.RESPONSE_TO_HADOOP_0_20_3_RPC);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIpcFromHadoop0_21_0() throws Exception {
|
||||
doIpcVersionTest(NetworkTraces.HADOOP_0_21_0_RPC_DUMP,
|
||||
NetworkTraces.RESPONSE_TO_HADOOP_0_21_0_RPC);
|
||||
}
|
||||
|
||||
private void doIpcVersionTest(
|
||||
byte[] requestData,
|
||||
byte[] expectedResponse) throws Exception {
|
||||
Server server = new TestServer(1, true);
|
||||
InetSocketAddress addr = NetUtils.getConnectAddress(server);
|
||||
server.start();
|
||||
Socket socket = new Socket();
|
||||
|
||||
try {
|
||||
NetUtils.connect(socket, addr, 5000);
|
||||
|
||||
OutputStream out = socket.getOutputStream();
|
||||
InputStream in = socket.getInputStream();
|
||||
out.write(requestData, 0, requestData.length);
|
||||
out.flush();
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
IOUtils.copyBytes(in, baos, 256);
|
||||
|
||||
byte[] responseData = baos.toByteArray();
|
||||
|
||||
assertEquals(
|
||||
StringUtils.byteToHexString(expectedResponse),
|
||||
StringUtils.byteToHexString(responseData));
|
||||
} finally {
|
||||
IOUtils.closeSocket(socket);
|
||||
server.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a string of lines that look like:
|
||||
* "68 72 70 63 02 00 00 00 82 00 1d 6f 72 67 2e 61 hrpc.... ...org.a"
|
||||
* .. into an array of bytes.
|
||||
*/
|
||||
private static byte[] hexDumpToBytes(String hexdump) {
|
||||
final int LAST_HEX_COL = 3 * 16;
|
||||
|
||||
StringBuilder hexString = new StringBuilder();
|
||||
|
||||
for (String line : hexdump.toUpperCase().split("\n")) {
|
||||
hexString.append(line.substring(0, LAST_HEX_COL).replace(" ", ""));
|
||||
}
|
||||
return StringUtils.hexStringToByte(hexString.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Wireshark traces collected from various client versions. These enable
|
||||
* us to test that old versions of the IPC stack will receive the correct
|
||||
* responses so that they will throw a meaningful error message back
|
||||
* to the user.
|
||||
*/
|
||||
private static abstract class NetworkTraces {
|
||||
/**
|
||||
* Wireshark dump of an RPC request from Hadoop 0.18.3
|
||||
*/
|
||||
final static byte[] HADOOP_0_18_3_RPC_DUMP =
|
||||
hexDumpToBytes(
|
||||
"68 72 70 63 02 00 00 00 82 00 1d 6f 72 67 2e 61 hrpc.... ...org.a\n" +
|
||||
"70 61 63 68 65 2e 68 61 64 6f 6f 70 2e 69 6f 2e pache.ha doop.io.\n" +
|
||||
"57 72 69 74 61 62 6c 65 00 30 6f 72 67 2e 61 70 Writable .0org.ap\n" +
|
||||
"61 63 68 65 2e 68 61 64 6f 6f 70 2e 69 6f 2e 4f ache.had oop.io.O\n" +
|
||||
"62 6a 65 63 74 57 72 69 74 61 62 6c 65 24 4e 75 bjectWri table$Nu\n" +
|
||||
"6c 6c 49 6e 73 74 61 6e 63 65 00 2f 6f 72 67 2e llInstan ce./org.\n" +
|
||||
"61 70 61 63 68 65 2e 68 61 64 6f 6f 70 2e 73 65 apache.h adoop.se\n" +
|
||||
"63 75 72 69 74 79 2e 55 73 65 72 47 72 6f 75 70 curity.U serGroup\n" +
|
||||
"49 6e 66 6f 72 6d 61 74 69 6f 6e 00 00 00 6c 00 Informat ion...l.\n" +
|
||||
"00 00 00 00 12 67 65 74 50 72 6f 74 6f 63 6f 6c .....get Protocol\n" +
|
||||
"56 65 72 73 69 6f 6e 00 00 00 02 00 10 6a 61 76 Version. .....jav\n" +
|
||||
"61 2e 6c 61 6e 67 2e 53 74 72 69 6e 67 00 2e 6f a.lang.S tring..o\n" +
|
||||
"72 67 2e 61 70 61 63 68 65 2e 68 61 64 6f 6f 70 rg.apach e.hadoop\n" +
|
||||
"2e 6d 61 70 72 65 64 2e 4a 6f 62 53 75 62 6d 69 .mapred. JobSubmi\n" +
|
||||
"73 73 69 6f 6e 50 72 6f 74 6f 63 6f 6c 00 04 6c ssionPro tocol..l\n" +
|
||||
"6f 6e 67 00 00 00 00 00 00 00 0a ong..... ... \n");
|
||||
|
||||
final static String HADOOP0_18_ERROR_MSG =
|
||||
"Server IPC version " + Server.CURRENT_VERSION +
|
||||
" cannot communicate with client version 2";
|
||||
|
||||
/**
|
||||
* Wireshark dump of the correct response that triggers an error message
|
||||
* on an 0.18.3 client.
|
||||
*/
|
||||
final static byte[] RESPONSE_TO_HADOOP_0_18_3_RPC =
|
||||
Bytes.concat(hexDumpToBytes(
|
||||
"00 00 00 00 01 00 00 00 29 6f 72 67 2e 61 70 61 ........ )org.apa\n" +
|
||||
"63 68 65 2e 68 61 64 6f 6f 70 2e 69 70 63 2e 52 che.hado op.ipc.R\n" +
|
||||
"50 43 24 56 65 72 73 69 6f 6e 4d 69 73 6d 61 74 PC$Versi onMismat\n" +
|
||||
"63 68 ch \n"),
|
||||
Ints.toByteArray(HADOOP0_18_ERROR_MSG.length()),
|
||||
HADOOP0_18_ERROR_MSG.getBytes());
|
||||
|
||||
/**
|
||||
* Wireshark dump of an RPC request from Hadoop 0.20.3
|
||||
*/
|
||||
final static byte[] HADOOP_0_20_3_RPC_DUMP =
|
||||
hexDumpToBytes(
|
||||
"68 72 70 63 03 00 00 00 79 27 6f 72 67 2e 61 70 hrpc.... y'org.ap\n" +
|
||||
"61 63 68 65 2e 68 61 64 6f 6f 70 2e 69 70 63 2e ache.had oop.ipc.\n" +
|
||||
"56 65 72 73 69 6f 6e 65 64 50 72 6f 74 6f 63 6f Versione dProtoco\n" +
|
||||
"6c 01 0a 53 54 52 49 4e 47 5f 55 47 49 04 74 6f l..STRIN G_UGI.to\n" +
|
||||
"64 64 09 04 74 6f 64 64 03 61 64 6d 07 64 69 61 dd..todd .adm.dia\n" +
|
||||
"6c 6f 75 74 05 63 64 72 6f 6d 07 70 6c 75 67 64 lout.cdr om.plugd\n" +
|
||||
"65 76 07 6c 70 61 64 6d 69 6e 05 61 64 6d 69 6e ev.lpadm in.admin\n" +
|
||||
"0a 73 61 6d 62 61 73 68 61 72 65 06 6d 72 74 65 .sambash are.mrte\n" +
|
||||
"73 74 00 00 00 6c 00 00 00 00 00 12 67 65 74 50 st...l.. ....getP\n" +
|
||||
"72 6f 74 6f 63 6f 6c 56 65 72 73 69 6f 6e 00 00 rotocolV ersion..\n" +
|
||||
"00 02 00 10 6a 61 76 61 2e 6c 61 6e 67 2e 53 74 ....java .lang.St\n" +
|
||||
"72 69 6e 67 00 2e 6f 72 67 2e 61 70 61 63 68 65 ring..or g.apache\n" +
|
||||
"2e 68 61 64 6f 6f 70 2e 6d 61 70 72 65 64 2e 4a .hadoop. mapred.J\n" +
|
||||
"6f 62 53 75 62 6d 69 73 73 69 6f 6e 50 72 6f 74 obSubmis sionProt\n" +
|
||||
"6f 63 6f 6c 00 04 6c 6f 6e 67 00 00 00 00 00 00 ocol..lo ng......\n" +
|
||||
"00 14 .. \n");
|
||||
|
||||
final static String HADOOP0_20_ERROR_MSG =
|
||||
"Server IPC version " + Server.CURRENT_VERSION +
|
||||
" cannot communicate with client version 3";
|
||||
|
||||
|
||||
final static byte[] RESPONSE_TO_HADOOP_0_20_3_RPC =
|
||||
Bytes.concat(hexDumpToBytes(
|
||||
"ff ff ff ff ff ff ff ff 00 00 00 29 6f 72 67 2e ........ ...)org.\n" +
|
||||
"61 70 61 63 68 65 2e 68 61 64 6f 6f 70 2e 69 70 apache.h adoop.ip\n" +
|
||||
"63 2e 52 50 43 24 56 65 72 73 69 6f 6e 4d 69 73 c.RPC$Ve rsionMis\n" +
|
||||
"6d 61 74 63 68 match \n"),
|
||||
Ints.toByteArray(HADOOP0_20_ERROR_MSG.length()),
|
||||
HADOOP0_20_ERROR_MSG.getBytes());
|
||||
|
||||
|
||||
final static String HADOOP0_21_ERROR_MSG =
|
||||
"Server IPC version " + Server.CURRENT_VERSION +
|
||||
" cannot communicate with client version 4";
|
||||
|
||||
final static byte[] HADOOP_0_21_0_RPC_DUMP =
|
||||
hexDumpToBytes(
|
||||
"68 72 70 63 04 50 hrpc.P" +
|
||||
// in 0.21 it comes in two separate TCP packets
|
||||
"00 00 00 3c 33 6f 72 67 2e 61 70 61 63 68 65 2e ...<3org .apache.\n" +
|
||||
"68 61 64 6f 6f 70 2e 6d 61 70 72 65 64 75 63 65 hadoop.m apreduce\n" +
|
||||
"2e 70 72 6f 74 6f 63 6f 6c 2e 43 6c 69 65 6e 74 .protoco l.Client\n" +
|
||||
"50 72 6f 74 6f 63 6f 6c 01 00 04 74 6f 64 64 00 Protocol ...todd.\n" +
|
||||
"00 00 00 71 00 00 00 00 00 12 67 65 74 50 72 6f ...q.... ..getPro\n" +
|
||||
"74 6f 63 6f 6c 56 65 72 73 69 6f 6e 00 00 00 02 tocolVer sion....\n" +
|
||||
"00 10 6a 61 76 61 2e 6c 61 6e 67 2e 53 74 72 69 ..java.l ang.Stri\n" +
|
||||
"6e 67 00 33 6f 72 67 2e 61 70 61 63 68 65 2e 68 ng.3org. apache.h\n" +
|
||||
"61 64 6f 6f 70 2e 6d 61 70 72 65 64 75 63 65 2e adoop.ma preduce.\n" +
|
||||
"70 72 6f 74 6f 63 6f 6c 2e 43 6c 69 65 6e 74 50 protocol .ClientP\n" +
|
||||
"72 6f 74 6f 63 6f 6c 00 04 6c 6f 6e 67 00 00 00 rotocol. .long...\n" +
|
||||
"00 00 00 00 21 ....! \n");
|
||||
|
||||
final static byte[] RESPONSE_TO_HADOOP_0_21_0_RPC =
|
||||
Bytes.concat(hexDumpToBytes(
|
||||
"ff ff ff ff ff ff ff ff 00 00 00 29 6f 72 67 2e ........ ...)org.\n" +
|
||||
"61 70 61 63 68 65 2e 68 61 64 6f 6f 70 2e 69 70 apache.h adoop.ip\n" +
|
||||
"63 2e 52 50 43 24 56 65 72 73 69 6f 6e 4d 69 73 c.RPC$Ve rsionMis\n" +
|
||||
"6d 61 74 63 68 match \n"),
|
||||
Ints.toByteArray(HADOOP0_21_ERROR_MSG.length()),
|
||||
HADOOP0_21_ERROR_MSG.getBytes());
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
//new TestIPC().testSerial(5, false, 2, 10, 1000);
|
||||
|
Loading…
Reference in New Issue
Block a user