MAPREDUCE-5326. Added version to shuffle header. Contributed by Zhijie Shen.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1496741 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2013-06-26 05:49:31 +00:00
parent b3a8d99817
commit 11bcd2ed12
6 changed files with 151 additions and 24 deletions

View File

@ -320,6 +320,9 @@ Release 2.1.0-beta - UNRELEASED
MAPREDUCE-5194. Heed interrupts during Fetcher shutdown. (cdouglas) MAPREDUCE-5194. Heed interrupts during Fetcher shutdown. (cdouglas)
MAPREDUCE-5326. Added version to shuffle header. (Zhijie Shen via
acmurthy)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method MAPREDUCE-4974. Optimising the LineRecordReader initialize() method

View File

@ -275,6 +275,11 @@ protected void copyFromHost(MapHost host) throws IOException {
SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash); SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
// set the read timeout // set the read timeout
connection.setReadTimeout(readTimeout); connection.setReadTimeout(readTimeout);
// put shuffle version into http header
connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
connect(connection, connectionTimeout); connect(connection, connectionTimeout);
// verify that the thread wasn't stopped during calls to connect // verify that the thread wasn't stopped during calls to connect
if (stopped) { if (stopped) {
@ -290,7 +295,13 @@ protected void copyFromHost(MapHost host) throws IOException {
"Got invalid response code " + rc + " from " + url + "Got invalid response code " + rc + " from " + url +
": " + connection.getResponseMessage()); ": " + connection.getResponseMessage());
} }
// get the shuffle version
if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
|| !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
throw new IOException("Incompatible shuffle response version");
}
// get the replyHash which is HMac of the encHash we sent to the server // get the replyHash which is HMac of the encHash we sent to the server
String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH); String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
if(replyHash==null) { if(replyHash==null) {

View File

@ -36,6 +36,12 @@
@InterfaceStability.Stable @InterfaceStability.Stable
public class ShuffleHeader implements Writable { public class ShuffleHeader implements Writable {
/** Header info of the shuffle http request/response */
public static final String HTTP_HEADER_NAME = "name";
public static final String DEFAULT_HTTP_HEADER_NAME = "mapreduce";
public static final String HTTP_HEADER_VERSION = "version";
public static final String DEFAULT_HTTP_HEADER_VERSION = "1.0.0";
/** /**
* The longest possible length of task attempt id that we will accept. * The longest possible length of task attempt id that we will accept.
*/ */

View File

@ -124,9 +124,8 @@ public void testCopyFromHostConnectionTimeout() throws Exception {
underTest.copyFromHost(host); underTest.copyFromHost(host);
verify(connection) verify(connection).addRequestProperty(
.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
encHash);
verify(allErrs).increment(1); verify(allErrs).increment(1);
verify(ss).copyFailed(map1ID, host, false, false); verify(ss).copyFailed(map1ID, host, false, false);
@ -144,6 +143,10 @@ public void testCopyFromHostBogusHeader() throws Exception {
String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
when(connection.getResponseCode()).thenReturn(200); when(connection.getResponseCode()).thenReturn(200);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
.thenReturn(replyHash); .thenReturn(replyHash);
ByteArrayInputStream in = new ByteArrayInputStream( ByteArrayInputStream in = new ByteArrayInputStream(
@ -152,9 +155,8 @@ public void testCopyFromHostBogusHeader() throws Exception {
underTest.copyFromHost(host); underTest.copyFromHost(host);
verify(connection) verify(connection).addRequestProperty(
.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
encHash);
verify(allErrs).increment(1); verify(allErrs).increment(1);
verify(ss).copyFailed(map1ID, host, true, false); verify(ss).copyFailed(map1ID, host, true, false);
@ -164,6 +166,37 @@ public void testCopyFromHostBogusHeader() throws Exception {
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
} }
@Test
public void testCopyFromHostIncompatibleShuffleVersion() throws Exception {
String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
when(connection.getResponseCode()).thenReturn(200);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
.thenReturn("mapreduce").thenReturn("other").thenReturn("other");
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
.thenReturn("1.0.1").thenReturn("1.0.0").thenReturn("1.0.1");
when(connection.getHeaderField(
SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)).thenReturn(replyHash);
ByteArrayInputStream in = new ByteArrayInputStream(new byte[0]);
when(connection.getInputStream()).thenReturn(in);
for (int i = 0; i < 3; ++i) {
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
r, metrics, except, key, connection);
underTest.copyFromHost(host);
}
verify(connection, times(3)).addRequestProperty(
SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
verify(allErrs, times(3)).increment(1);
verify(ss, times(3)).copyFailed(map1ID, host, false, false);
verify(ss, times(3)).copyFailed(map2ID, host, false, false);
verify(ss, times(3)).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
verify(ss, times(3)).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
}
@Test @Test
public void testCopyFromHostWait() throws Exception { public void testCopyFromHostWait() throws Exception {
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm, Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
@ -179,6 +212,10 @@ public void testCopyFromHostWait() throws Exception {
header.write(new DataOutputStream(bout)); header.write(new DataOutputStream(bout));
ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
when(connection.getInputStream()).thenReturn(in); when(connection.getInputStream()).thenReturn(in);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
//Defaults to null, which is what we want to test //Defaults to null, which is what we want to test
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
.thenReturn(null); .thenReturn(null);
@ -214,11 +251,14 @@ public void testCopyFromHostCompressFailure() throws Exception {
header.write(new DataOutputStream(bout)); header.write(new DataOutputStream(bout));
ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
when(connection.getInputStream()).thenReturn(in); when(connection.getInputStream()).thenReturn(in);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
.thenReturn(immo); .thenReturn(immo);
doThrow(new java.lang.InternalError()) doThrow(new java.lang.InternalError()).when(immo)
.when(immo)
.shuffle(any(MapHost.class), any(InputStream.class), anyLong(), .shuffle(any(MapHost.class), any(InputStream.class), anyLong(),
anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class)); anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
@ -242,6 +282,10 @@ public void testInterruptInMemory() throws Exception {
String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key); String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
when(connection.getResponseCode()).thenReturn(200); when(connection.getResponseCode()).thenReturn(200);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH)) when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
.thenReturn(replyHash); .thenReturn(replyHash);
ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1); ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
@ -294,6 +338,10 @@ public void testInterruptOnDisk() throws Exception {
final StuckInputStream in = final StuckInputStream in =
new StuckInputStream(new ByteArrayInputStream(bout.toByteArray())); new StuckInputStream(new ByteArrayInputStream(bout.toByteArray()));
when(connection.getInputStream()).thenReturn(in); when(connection.getInputStream()).thenReturn(in);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
doAnswer(new Answer<Void>() { doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock ignore) throws IOException { public Void answer(InvocationOnMock ignore) throws IOException {
in.close(); in.close();

View File

@ -23,6 +23,7 @@
import static org.jboss.netty.handler.codec.http.HttpMethod.GET; import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
@ -438,6 +439,13 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
sendError(ctx, METHOD_NOT_ALLOWED); sendError(ctx, METHOD_NOT_ALLOWED);
return; return;
} }
// Check whether the shuffle version is compatible
if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
request.getHeader(ShuffleHeader.HTTP_HEADER_NAME))
|| !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
request.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) {
sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
}
final Map<String,List<String>> q = final Map<String,List<String>> q =
new QueryStringDecoder(request.getUri()).getParameters(); new QueryStringDecoder(request.getUri()).getParameters();
final List<String> mapIds = splitMaps(q.get("map")); final List<String> mapIds = splitMaps(q.get("map"));
@ -543,6 +551,11 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8), SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8),
tokenSecret); tokenSecret);
response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply); response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
// Put shuffle version into http header
response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
int len = reply.length(); int len = reply.length();
LOG.debug("Fetcher request verfied. enc_str=" + enc_str + ";reply=" + LOG.debug("Fetcher request verfied. enc_str=" + enc_str + ";reply=" +
@ -627,6 +640,11 @@ protected void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) { HttpResponseStatus status) {
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
// Put shuffle version into http header
response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
response.setContent( response.setContent(
ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8)); ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));

View File

@ -180,6 +180,10 @@ protected void sendError(ChannelHandlerContext ctx, String message,
+ shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+ "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0"); + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
HttpURLConnection conn = (HttpURLConnection)url.openConnection(); HttpURLConnection conn = (HttpURLConnection)url.openConnection();
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
conn.connect(); conn.connect();
DataInputStream input = new DataInputStream(conn.getInputStream()); DataInputStream input = new DataInputStream(conn.getInputStream());
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
@ -192,6 +196,35 @@ protected void sendError(ChannelHandlerContext ctx, String message,
failures.size() == 0); failures.size() == 0);
} }
@Test (timeout = 10000)
public void testIncompatibleShuffleVersion() throws Exception {
final int failureNum = 3;
Configuration conf = new Configuration();
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
ShuffleHandler shuffleHandler = new ShuffleHandler();
shuffleHandler.init(conf);
shuffleHandler.start();
// simulate a reducer that closes early by reading a single shuffle header
// then closing the connection
URL url = new URL("http://127.0.0.1:"
+ shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+ "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
for (int i = 0; i < failureNum; ++i) {
HttpURLConnection conn = (HttpURLConnection)url.openConnection();
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
i == 0 ? "mapreduce" : "other");
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
i == 1 ? "1.0.0" : "1.0.1");
conn.connect();
Assert.assertEquals(
HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode());
}
shuffleHandler.stop();
shuffleHandler.close();
}
@Test (timeout = 10000) @Test (timeout = 10000)
public void testMaxConnections() throws Exception { public void testMaxConnections() throws Exception {
@ -242,6 +275,10 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
+ i + "_0"; + i + "_0";
URL url = new URL(URLstring); URL url = new URL(URLstring);
conns[i] = (HttpURLConnection)url.openConnection(); conns[i] = (HttpURLConnection)url.openConnection();
conns[i].setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
conns[i].setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
} }
// Try to open numerous connections // Try to open numerous connections
@ -330,6 +367,10 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+ "/mapOutput?job=job_12345_0001&reduce=" + reducerId + "/mapOutput?job=job_12345_0001&reduce=" + reducerId
+ "&map=attempt_12345_1_m_1_0"); + "&map=attempt_12345_1_m_1_0");
HttpURLConnection conn = (HttpURLConnection) url.openConnection(); HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
conn.connect(); conn.connect();
byte[] byteArr = new byte[10000]; byte[] byteArr = new byte[10000];
try { try {