MAPREDUCE-6518. Set SO_KEEPALIVE on shuffle connections. Contributed by Chang Li
This commit is contained in:
parent
d806a5bf07
commit
ee9b80acf8
@ -650,6 +650,9 @@ Release 2.7.2 - UNRELEASED
|
|||||||
TaskAttemptImpl#sendJHStartEventForAssignedFailTask (Bibin A Chundatt via
|
TaskAttemptImpl#sendJHStartEventForAssignedFailTask (Bibin A Chundatt via
|
||||||
jlowe)
|
jlowe)
|
||||||
|
|
||||||
|
MAPREDUCE-6518. Set SO_KEEPALIVE on shuffle connections (Chang Li via
|
||||||
|
jlowe)
|
||||||
|
|
||||||
Release 2.7.1 - 2015-07-06
|
Release 2.7.1 - 2015-07-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -494,6 +494,7 @@ protected void serviceStart() throws Exception {
|
|||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
throw new RuntimeException(ex);
|
throw new RuntimeException(ex);
|
||||||
}
|
}
|
||||||
|
bootstrap.setOption("child.keepAlive", true);
|
||||||
bootstrap.setPipelineFactory(pipelineFact);
|
bootstrap.setPipelineFactory(pipelineFact);
|
||||||
port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
|
port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
|
||||||
Channel ch = bootstrap.bind(new InetSocketAddress(port));
|
Channel ch = bootstrap.bind(new InetSocketAddress(port));
|
||||||
|
@ -80,6 +80,8 @@
|
|||||||
import org.jboss.netty.channel.Channel;
|
import org.jboss.netty.channel.Channel;
|
||||||
import org.jboss.netty.channel.ChannelFuture;
|
import org.jboss.netty.channel.ChannelFuture;
|
||||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||||
|
import org.jboss.netty.channel.ChannelStateEvent;
|
||||||
|
import org.jboss.netty.channel.socket.SocketChannel;
|
||||||
import org.jboss.netty.channel.MessageEvent;
|
import org.jboss.netty.channel.MessageEvent;
|
||||||
import org.jboss.netty.channel.AbstractChannel;
|
import org.jboss.netty.channel.AbstractChannel;
|
||||||
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
|
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
|
||||||
@ -140,6 +142,27 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class MockShuffleHandler2 extends org.apache.hadoop.mapred.ShuffleHandler {
|
||||||
|
boolean socketKeepAlive = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Shuffle getShuffle(final Configuration conf) {
|
||||||
|
return new Shuffle(conf) {
|
||||||
|
@Override
|
||||||
|
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
|
||||||
|
HttpRequest request, HttpResponse response, URL requestUri)
|
||||||
|
throws IOException {
|
||||||
|
SocketChannel channel = (SocketChannel)(ctx.getChannel());
|
||||||
|
socketKeepAlive = channel.getConfig().isKeepAlive();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean isSocketKeepAlive() {
|
||||||
|
return socketKeepAlive;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test the validation of ShuffleHandler's meta-data's serialization and
|
* Test the validation of ShuffleHandler's meta-data's serialization and
|
||||||
* de-serialization.
|
* de-serialization.
|
||||||
@ -423,6 +446,42 @@ protected void sendError(ChannelHandlerContext ctx, String message,
|
|||||||
input.close();
|
input.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testSocketKeepAlive() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
|
||||||
|
conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
|
||||||
|
// try setting to -ve keep alive timeout.
|
||||||
|
conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100);
|
||||||
|
HttpURLConnection conn = null;
|
||||||
|
MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2();
|
||||||
|
try {
|
||||||
|
shuffleHandler.init(conf);
|
||||||
|
shuffleHandler.start();
|
||||||
|
|
||||||
|
String shuffleBaseURL = "http://127.0.0.1:"
|
||||||
|
+ shuffleHandler.getConfig().get(
|
||||||
|
ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
|
||||||
|
URL url =
|
||||||
|
new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
|
||||||
|
+ "map=attempt_12345_1_m_1_0");
|
||||||
|
conn = (HttpURLConnection) url.openConnection();
|
||||||
|
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
|
||||||
|
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
|
||||||
|
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
|
||||||
|
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
|
||||||
|
conn.connect();
|
||||||
|
conn.getInputStream();
|
||||||
|
Assert.assertTrue("socket should be set KEEP_ALIVE",
|
||||||
|
shuffleHandler.isSocketKeepAlive());
|
||||||
|
} finally {
|
||||||
|
if (conn != null) {
|
||||||
|
conn.disconnect();
|
||||||
|
}
|
||||||
|
shuffleHandler.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* simulate a reducer that sends an invalid shuffle-header - sometimes a wrong
|
* simulate a reducer that sends an invalid shuffle-header - sometimes a wrong
|
||||||
* header_name and sometimes a wrong version
|
* header_name and sometimes a wrong version
|
||||||
|
Loading…
Reference in New Issue
Block a user