MAPREDUCE-5787. Added the ability to keep alive shuffle connections in the MapReduce shuffle-handler. Contributed by Rajesh Balamohan.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1580062 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fc53af9c4c
commit
a5c08eed16
@ -173,6 +173,9 @@ Release 2.4.0 - UNRELEASED
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
MAPREDUCE-5787. Added the ability to keep alive shuffle connections in the
|
||||
MapReduce shuffle-handler. (Rajesh Balamohan via vinodkv)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
MAPREDUCE-5464. Add analogs of the SLOTS_MILLIS counters that jive with the
|
||||
|
@ -151,6 +151,21 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>mapreduce.shuffle.connection-keep-alive.enable</name>
|
||||
<value>false</value>
|
||||
<description>set to true to support keep-alive connections.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>mapreduce.shuffle.connection-keep-alive.timeout</name>
|
||||
<value>5</value>
|
||||
<description>The number of seconds a shuffle client attempts to retain
|
||||
http connection. Refer "Keep-Alive: timeout=" header in
|
||||
Http specification
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>mapreduce.task.timeout</name>
|
||||
<value>600000</value>
|
||||
|
@ -23,7 +23,6 @@
|
||||
import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
|
||||
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
|
||||
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
|
||||
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED;
|
||||
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
|
||||
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
|
||||
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
|
||||
@ -41,6 +40,7 @@
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -110,6 +110,7 @@
|
||||
import org.jboss.netty.handler.ssl.SslHandler;
|
||||
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
|
||||
import org.jboss.netty.util.CharsetUtil;
|
||||
import org.mortbay.jetty.HttpHeaders;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
@ -156,6 +157,21 @@ public class ShuffleHandler extends AuxiliaryService {
|
||||
public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
|
||||
public static final int DEFAULT_SHUFFLE_PORT = 13562;
|
||||
|
||||
public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED =
|
||||
"mapreduce.shuffle.connection-keep-alive.enable";
|
||||
public static final boolean DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = false;
|
||||
|
||||
public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT =
|
||||
"mapreduce.shuffle.connection-keep-alive.timeout";
|
||||
public static final int DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT = 5; //seconds
|
||||
|
||||
public static final String SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE =
|
||||
"mapreduce.shuffle.mapoutput-info.meta.cache.size";
|
||||
public static final int DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE =
|
||||
1000;
|
||||
|
||||
public static final String CONNECTION_CLOSE = "close";
|
||||
|
||||
public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
|
||||
"mapreduce.shuffle.ssl.file.buffer.size";
|
||||
|
||||
@ -167,6 +183,9 @@ public class ShuffleHandler extends AuxiliaryService {
|
||||
public static final String MAX_SHUFFLE_THREADS = "mapreduce.shuffle.max.threads";
|
||||
// 0 implies Netty default of 2 * number of available processors
|
||||
public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
|
||||
boolean connectionKeepAliveEnabled = false;
|
||||
int connectionKeepAliveTimeOut;
|
||||
int mapOutputMetaInfoCacheSize;
|
||||
|
||||
@Metrics(about="Shuffle output metrics", context="mapred")
|
||||
static class ShuffleMetrics implements ChannelFutureListener {
|
||||
@ -328,6 +347,15 @@ protected void serviceStart() throws Exception {
|
||||
|
||||
sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
|
||||
DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
|
||||
connectionKeepAliveEnabled =
|
||||
conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED,
|
||||
DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED);
|
||||
connectionKeepAliveTimeOut =
|
||||
Math.max(1, conf.getInt(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT,
|
||||
DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT));
|
||||
mapOutputMetaInfoCacheSize =
|
||||
Math.max(1, conf.getInt(SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE,
|
||||
DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -459,6 +487,15 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
|
||||
}
|
||||
final Map<String,List<String>> q =
|
||||
new QueryStringDecoder(request.getUri()).getParameters();
|
||||
final List<String> keepAliveList = q.get("keepAlive");
|
||||
boolean keepAliveParam = false;
|
||||
if (keepAliveList != null && keepAliveList.size() == 1) {
|
||||
keepAliveParam = Boolean.valueOf(keepAliveList.get(0));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("KeepAliveParam : " + keepAliveList
|
||||
+ " : " + keepAliveParam);
|
||||
}
|
||||
}
|
||||
final List<String> mapIds = splitMaps(q.get("map"));
|
||||
final List<String> reduceQ = q.get("reduce");
|
||||
final List<String> jobQ = q.get("job");
|
||||
@ -466,7 +503,8 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
|
||||
LOG.debug("RECV: " + request.getUri() +
|
||||
"\n mapId: " + mapIds +
|
||||
"\n reduceId: " + reduceQ +
|
||||
"\n jobId: " + jobQ);
|
||||
"\n jobId: " + jobQ +
|
||||
"\n keepAlive: " + keepAliveParam);
|
||||
}
|
||||
|
||||
if (mapIds == null || reduceQ == null || jobQ == null) {
|
||||
@ -505,27 +543,46 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
|
||||
return;
|
||||
}
|
||||
|
||||
Map<String, MapOutputInfo> mapOutputInfoMap =
|
||||
new HashMap<String, MapOutputInfo>();
|
||||
Channel ch = evt.getChannel();
|
||||
String user = userRsrc.get(jobId);
|
||||
|
||||
// $x/$user/appcache/$appId/output/$mapId
|
||||
// TODO: Once Shuffle is out of NM, this can use MR APIs to convert
|
||||
// between App and Job
|
||||
String outputBasePathStr = getBaseLocation(jobId, user);
|
||||
|
||||
try {
|
||||
populateHeaders(mapIds, outputBasePathStr, user, reduceId, request,
|
||||
response, keepAliveParam, mapOutputInfoMap);
|
||||
} catch(IOException e) {
|
||||
ch.write(response);
|
||||
LOG.error("Shuffle error in populating headers :", e);
|
||||
String errorMessage = getErrorMessage(e);
|
||||
sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
|
||||
return;
|
||||
}
|
||||
ch.write(response);
|
||||
// TODO refactor the following into the pipeline
|
||||
ChannelFuture lastMap = null;
|
||||
for (String mapId : mapIds) {
|
||||
try {
|
||||
MapOutputInfo info = mapOutputInfoMap.get(mapId);
|
||||
if (info == null) {
|
||||
info = getMapOutputInfo(outputBasePathStr, mapId, reduceId, user);
|
||||
}
|
||||
lastMap =
|
||||
sendMapOutput(ctx, ch, userRsrc.get(jobId), jobId, mapId, reduceId);
|
||||
sendMapOutput(ctx, ch, user, mapId,
|
||||
reduceId, info);
|
||||
if (null == lastMap) {
|
||||
sendError(ctx, NOT_FOUND);
|
||||
return;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Shuffle error :", e);
|
||||
StringBuffer sb = new StringBuffer(e.getMessage());
|
||||
Throwable t = e;
|
||||
while (t.getCause() != null) {
|
||||
sb.append(t.getCause().getMessage());
|
||||
t = t.getCause();
|
||||
}
|
||||
sendError(ctx,sb.toString() , INTERNAL_SERVER_ERROR);
|
||||
String errorMessage = getErrorMessage(e);
|
||||
sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -533,6 +590,99 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
|
||||
lastMap.addListener(ChannelFutureListener.CLOSE);
|
||||
}
|
||||
|
||||
private String getErrorMessage(Throwable t) {
|
||||
StringBuffer sb = new StringBuffer(t.getMessage());
|
||||
while (t.getCause() != null) {
|
||||
sb.append(t.getCause().getMessage());
|
||||
t = t.getCause();
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private String getBaseLocation(String jobId, String user) {
|
||||
final JobID jobID = JobID.forName(jobId);
|
||||
final ApplicationId appID =
|
||||
ApplicationId.newInstance(Long.parseLong(jobID.getJtIdentifier()),
|
||||
jobID.getId());
|
||||
final String baseStr =
|
||||
ContainerLocalizer.USERCACHE + "/" + user + "/"
|
||||
+ ContainerLocalizer.APPCACHE + "/"
|
||||
+ ConverterUtils.toString(appID) + "/output" + "/";
|
||||
return baseStr;
|
||||
}
|
||||
|
||||
protected MapOutputInfo getMapOutputInfo(String base, String mapId,
|
||||
int reduce, String user) throws IOException {
|
||||
// Index file
|
||||
Path indexFileName =
|
||||
lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
|
||||
IndexRecord info =
|
||||
indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
|
||||
|
||||
Path mapOutputFileName =
|
||||
lDirAlloc.getLocalPathToRead(base + "/file.out", conf);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(base + " : " + mapOutputFileName + " : " + indexFileName);
|
||||
}
|
||||
MapOutputInfo outputInfo = new MapOutputInfo(mapOutputFileName, info);
|
||||
return outputInfo;
|
||||
}
|
||||
|
||||
protected void populateHeaders(List<String> mapIds, String outputBaseStr,
|
||||
String user, int reduce, HttpRequest request, HttpResponse response,
|
||||
boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap)
|
||||
throws IOException {
|
||||
|
||||
long contentLength = 0;
|
||||
for (String mapId : mapIds) {
|
||||
String base = outputBaseStr + mapId;
|
||||
MapOutputInfo outputInfo = getMapOutputInfo(base, mapId, reduce, user);
|
||||
if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
|
||||
mapOutputInfoMap.put(mapId, outputInfo);
|
||||
}
|
||||
// Index file
|
||||
Path indexFileName =
|
||||
lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
|
||||
IndexRecord info =
|
||||
indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
|
||||
ShuffleHeader header =
|
||||
new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
header.write(dob);
|
||||
|
||||
contentLength += info.partLength;
|
||||
contentLength += dob.getLength();
|
||||
}
|
||||
|
||||
// Now set the response headers.
|
||||
setResponseHeaders(response, keepAliveParam, contentLength);
|
||||
}
|
||||
|
||||
protected void setResponseHeaders(HttpResponse response,
|
||||
boolean keepAliveParam, long contentLength) {
|
||||
if (!connectionKeepAliveEnabled && !keepAliveParam) {
|
||||
LOG.info("Setting connection close header...");
|
||||
response.setHeader(HttpHeaders.CONNECTION, CONNECTION_CLOSE);
|
||||
} else {
|
||||
response.setHeader(HttpHeaders.CONTENT_LENGTH,
|
||||
String.valueOf(contentLength));
|
||||
response.setHeader(HttpHeaders.CONNECTION, HttpHeaders.KEEP_ALIVE);
|
||||
response.setHeader(HttpHeaders.KEEP_ALIVE, "timeout="
|
||||
+ connectionKeepAliveTimeOut);
|
||||
LOG.info("Content Length in shuffle : " + contentLength);
|
||||
}
|
||||
}
|
||||
|
||||
class MapOutputInfo {
|
||||
final Path mapOutputFileName;
|
||||
final IndexRecord indexRecord;
|
||||
|
||||
MapOutputInfo(Path mapOutputFileName, IndexRecord indexRecord) {
|
||||
this.mapOutputFileName = mapOutputFileName;
|
||||
this.indexRecord = indexRecord;
|
||||
}
|
||||
}
|
||||
|
||||
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
|
||||
HttpRequest request, HttpResponse response, URL requestUri)
|
||||
throws IOException {
|
||||
@ -575,39 +725,16 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
|
||||
}
|
||||
|
||||
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
|
||||
String user, String jobId, String mapId, int reduce)
|
||||
String user, String mapId, int reduce, MapOutputInfo mapOutputInfo)
|
||||
throws IOException {
|
||||
// TODO replace w/ rsrc alloc
|
||||
// $x/$user/appcache/$appId/output/$mapId
|
||||
// TODO: Once Shuffle is out of NM, this can use MR APIs to convert between App and Job
|
||||
JobID jobID = JobID.forName(jobId);
|
||||
ApplicationId appID = ApplicationId.newInstance(
|
||||
Long.parseLong(jobID.getJtIdentifier()), jobID.getId());
|
||||
final String base =
|
||||
ContainerLocalizer.USERCACHE + "/" + user + "/"
|
||||
+ ContainerLocalizer.APPCACHE + "/"
|
||||
+ ConverterUtils.toString(appID) + "/output" + "/" + mapId;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("DEBUG0 " + base);
|
||||
}
|
||||
// Index file
|
||||
Path indexFileName = lDirAlloc.getLocalPathToRead(
|
||||
base + "/file.out.index", conf);
|
||||
// Map-output file
|
||||
Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
|
||||
base + "/file.out", conf);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("DEBUG1 " + base + " : " + mapOutputFileName + " : "
|
||||
+ indexFileName);
|
||||
}
|
||||
final IndexRecord info =
|
||||
indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
|
||||
final IndexRecord info = mapOutputInfo.indexRecord;
|
||||
final ShuffleHeader header =
|
||||
new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
|
||||
final DataOutputBuffer dob = new DataOutputBuffer();
|
||||
header.write(dob);
|
||||
ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
|
||||
final File spillfile = new File(mapOutputFileName.toString());
|
||||
final File spillfile =
|
||||
new File(mapOutputInfo.mapOutputFileName.toString());
|
||||
RandomAccessFile spill;
|
||||
try {
|
||||
spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null);
|
||||
|
@ -23,6 +23,8 @@
|
||||
import static org.apache.hadoop.test.MockitoMaker.make;
|
||||
import static org.apache.hadoop.test.MockitoMaker.stub;
|
||||
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
|
||||
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
|
||||
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assume.assumeTrue;
|
||||
|
||||
@ -39,6 +41,7 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.zip.CheckedOutputStream;
|
||||
import java.util.zip.Checksum;
|
||||
|
||||
@ -69,17 +72,24 @@
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
|
||||
import org.jboss.netty.handler.codec.http.HttpRequest;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.mortbay.jetty.HttpHeaders;
|
||||
|
||||
public class TestShuffleHandler {
|
||||
static final long MiB = 1024 * 1024;
|
||||
private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class);
|
||||
|
||||
/**
|
||||
* Test the validation of ShuffleHandler's meta-data's serialization and
|
||||
* de-serialization.
|
||||
*
|
||||
* @throws Exception exception
|
||||
*/
|
||||
@Test (timeout = 10000)
|
||||
public void testSerializeMeta() throws Exception {
|
||||
assertEquals(1, ShuffleHandler.deserializeMetaData(
|
||||
@ -90,6 +100,11 @@ public void testSerializeMeta() throws Exception {
|
||||
ShuffleHandler.serializeMetaData(8080)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate shuffle connection and input/output metrics.
|
||||
*
|
||||
* @throws Exception exception
|
||||
*/
|
||||
@Test (timeout = 10000)
|
||||
public void testShuffleMetrics() throws Exception {
|
||||
MetricsSystem ms = new MetricsSystemImpl();
|
||||
@ -120,6 +135,11 @@ static void checkShuffleMetrics(MetricsSystem ms, long bytes, int failed,
|
||||
assertGauge("ShuffleConnections", connections, rb);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify client prematurely closing a connection.
|
||||
*
|
||||
* @throws Exception exception.
|
||||
*/
|
||||
@Test (timeout = 10000)
|
||||
public void testClientClosesConnection() throws Exception {
|
||||
final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
|
||||
@ -130,6 +150,20 @@ public void testClientClosesConnection() throws Exception {
|
||||
protected Shuffle getShuffle(Configuration conf) {
|
||||
// replace the shuffle handler with one stubbed for testing
|
||||
return new Shuffle(conf) {
|
||||
@Override
|
||||
protected MapOutputInfo getMapOutputInfo(String base, String mapId,
|
||||
int reduce, String user) throws IOException {
|
||||
return null;
|
||||
}
|
||||
@Override
|
||||
protected void populateHeaders(List<String> mapIds, String jobId,
|
||||
String user, int reduce, HttpRequest request,
|
||||
HttpResponse response, boolean keepAliveParam,
|
||||
Map<String, MapOutputInfo> infoMap) throws IOException {
|
||||
// Only set response headers and skip everything else
|
||||
// send some dummy value for content-length
|
||||
super.setResponseHeaders(response, keepAliveParam, 100);
|
||||
}
|
||||
@Override
|
||||
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
|
||||
HttpRequest request, HttpResponse response, URL requestUri)
|
||||
@ -137,7 +171,8 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
|
||||
}
|
||||
@Override
|
||||
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
|
||||
Channel ch, String user, String jobId, String mapId, int reduce)
|
||||
Channel ch, String user, String mapId, int reduce,
|
||||
MapOutputInfo info)
|
||||
throws IOException {
|
||||
// send a shuffle header and a lot of data down the channel
|
||||
// to trigger a broken pipe
|
||||
@ -147,7 +182,7 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
|
||||
header.write(dob);
|
||||
ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
|
||||
dob = new DataOutputBuffer();
|
||||
for (int i=0; i<100000; ++i) {
|
||||
for (int i = 0; i < 100000; ++i) {
|
||||
header.write(dob);
|
||||
}
|
||||
return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
|
||||
@ -187,6 +222,7 @@ protected void sendError(ChannelHandlerContext ctx, String message,
|
||||
conn.connect();
|
||||
DataInputStream input = new DataInputStream(conn.getInputStream());
|
||||
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
|
||||
Assert.assertEquals("close", conn.getHeaderField(HttpHeaders.CONNECTION));
|
||||
ShuffleHeader header = new ShuffleHeader();
|
||||
header.readFields(input);
|
||||
input.close();
|
||||
@ -196,6 +232,147 @@ protected void sendError(ChannelHandlerContext ctx, String message,
|
||||
failures.size() == 0);
|
||||
}
|
||||
|
||||
@Test(timeout = 10000)
|
||||
public void testKeepAlive() throws Exception {
|
||||
final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
|
||||
conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
|
||||
// try setting to -ve keep alive timeout.
|
||||
conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100);
|
||||
ShuffleHandler shuffleHandler = new ShuffleHandler() {
|
||||
@Override
|
||||
protected Shuffle getShuffle(final Configuration conf) {
|
||||
// replace the shuffle handler with one stubbed for testing
|
||||
return new Shuffle(conf) {
|
||||
@Override
|
||||
protected MapOutputInfo getMapOutputInfo(String base, String mapId,
|
||||
int reduce, String user) throws IOException {
|
||||
return null;
|
||||
}
|
||||
@Override
|
||||
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
|
||||
HttpRequest request, HttpResponse response, URL requestUri)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void populateHeaders(List<String> mapIds, String jobId,
|
||||
String user, int reduce, HttpRequest request,
|
||||
HttpResponse response, boolean keepAliveParam,
|
||||
Map<String, MapOutputInfo> infoMap) throws IOException {
|
||||
// Send some dummy data (populate content length details)
|
||||
ShuffleHeader header =
|
||||
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
header.write(dob);
|
||||
dob = new DataOutputBuffer();
|
||||
for (int i = 0; i < 100000; ++i) {
|
||||
header.write(dob);
|
||||
}
|
||||
|
||||
long contentLength = dob.getLength();
|
||||
// for testing purpose;
|
||||
// disable connectinKeepAliveEnabled if keepAliveParam is available
|
||||
if (keepAliveParam) {
|
||||
connectionKeepAliveEnabled = false;
|
||||
}
|
||||
|
||||
super.setResponseHeaders(response, keepAliveParam, contentLength);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
|
||||
Channel ch, String user, String mapId, int reduce,
|
||||
MapOutputInfo info) throws IOException {
|
||||
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
|
||||
|
||||
// send a shuffle header and a lot of data down the channel
|
||||
// to trigger a broken pipe
|
||||
ShuffleHeader header =
|
||||
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
header.write(dob);
|
||||
ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
|
||||
dob = new DataOutputBuffer();
|
||||
for (int i = 0; i < 100000; ++i) {
|
||||
header.write(dob);
|
||||
}
|
||||
return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sendError(ChannelHandlerContext ctx,
|
||||
HttpResponseStatus status) {
|
||||
if (failures.size() == 0) {
|
||||
failures.add(new Error());
|
||||
ctx.getChannel().close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sendError(ChannelHandlerContext ctx, String message,
|
||||
HttpResponseStatus status) {
|
||||
if (failures.size() == 0) {
|
||||
failures.add(new Error());
|
||||
ctx.getChannel().close();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
shuffleHandler.init(conf);
|
||||
shuffleHandler.start();
|
||||
|
||||
String shuffleBaseURL = "http://127.0.0.1:"
|
||||
+ shuffleHandler.getConfig().get(
|
||||
ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
|
||||
URL url =
|
||||
new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
|
||||
+ "map=attempt_12345_1_m_1_0");
|
||||
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
||||
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
|
||||
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
|
||||
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
|
||||
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
|
||||
conn.connect();
|
||||
DataInputStream input = new DataInputStream(conn.getInputStream());
|
||||
Assert.assertEquals(HttpHeaders.KEEP_ALIVE,
|
||||
conn.getHeaderField(HttpHeaders.CONNECTION));
|
||||
Assert.assertEquals("timeout=1",
|
||||
conn.getHeaderField(HttpHeaders.KEEP_ALIVE));
|
||||
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
|
||||
ShuffleHeader header = new ShuffleHeader();
|
||||
header.readFields(input);
|
||||
input.close();
|
||||
|
||||
// For keepAlive via URL
|
||||
url =
|
||||
new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
|
||||
+ "map=attempt_12345_1_m_1_0&keepAlive=true");
|
||||
conn = (HttpURLConnection) url.openConnection();
|
||||
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
|
||||
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
|
||||
conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
|
||||
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
|
||||
conn.connect();
|
||||
input = new DataInputStream(conn.getInputStream());
|
||||
Assert.assertEquals(HttpHeaders.KEEP_ALIVE,
|
||||
conn.getHeaderField(HttpHeaders.CONNECTION));
|
||||
Assert.assertEquals("timeout=1",
|
||||
conn.getHeaderField(HttpHeaders.KEEP_ALIVE));
|
||||
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
|
||||
header = new ShuffleHeader();
|
||||
header.readFields(input);
|
||||
input.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* simulate a reducer that sends an invalid shuffle-header - sometimes a wrong
|
||||
* header_name and sometimes a wrong version
|
||||
*
|
||||
* @throws Exception exception
|
||||
*/
|
||||
@Test (timeout = 10000)
|
||||
public void testIncompatibleShuffleVersion() throws Exception {
|
||||
final int failureNum = 3;
|
||||
@ -224,7 +401,12 @@ public void testIncompatibleShuffleVersion() throws Exception {
|
||||
shuffleHandler.stop();
|
||||
shuffleHandler.close();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Validate the limit on number of shuffle connections.
|
||||
*
|
||||
* @throws Exception exception
|
||||
*/
|
||||
@Test (timeout = 10000)
|
||||
public void testMaxConnections() throws Exception {
|
||||
|
||||
@ -236,14 +418,29 @@ public void testMaxConnections() throws Exception {
|
||||
protected Shuffle getShuffle(Configuration conf) {
|
||||
// replace the shuffle handler with one stubbed for testing
|
||||
return new Shuffle(conf) {
|
||||
@Override
|
||||
protected MapOutputInfo getMapOutputInfo(String base, String mapId,
|
||||
int reduce, String user) throws IOException {
|
||||
// Do nothing.
|
||||
return null;
|
||||
}
|
||||
@Override
|
||||
protected void populateHeaders(List<String> mapIds, String jobId,
|
||||
String user, int reduce, HttpRequest request,
|
||||
HttpResponse response, boolean keepAliveParam,
|
||||
Map<String, MapOutputInfo> infoMap) throws IOException {
|
||||
// Do nothing.
|
||||
}
|
||||
@Override
|
||||
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
|
||||
HttpRequest request, HttpResponse response, URL requestUri)
|
||||
throws IOException {
|
||||
// Do nothing.
|
||||
}
|
||||
@Override
|
||||
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
|
||||
Channel ch, String user, String jobId, String mapId, int reduce)
|
||||
Channel ch, String user, String mapId, int reduce,
|
||||
MapOutputInfo info)
|
||||
throws IOException {
|
||||
// send a shuffle header and a lot of data down the channel
|
||||
// to trigger a broken pipe
|
||||
@ -308,7 +505,13 @@ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
|
||||
|
||||
shuffleHandler.stop();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Validate the ownership of the map-output files being pulled in. The
|
||||
* local-file-system owner of the file should match the user component in the
|
||||
*
|
||||
* @throws Exception exception
|
||||
*/
|
||||
@Test(timeout = 100000)
|
||||
public void testMapFileAccess() throws IOException {
|
||||
// This will run only in NativeIO is enabled as SecureIOUtils need it
|
||||
@ -323,7 +526,7 @@ public void testMapFileAccess() throws IOException {
|
||||
TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
|
||||
ApplicationId appId = ApplicationId.newInstance(12345, 1);
|
||||
System.out.println(appId.toString());
|
||||
LOG.info(appId.toString());
|
||||
String appAttemptId = "attempt_12345_1_m_1_0";
|
||||
String user = "randomUser";
|
||||
String reducerId = "0";
|
||||
@ -341,6 +544,7 @@ protected Shuffle getShuffle(Configuration conf) {
|
||||
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
|
||||
HttpRequest request, HttpResponse response, URL requestUri)
|
||||
throws IOException {
|
||||
// Do nothing.
|
||||
}
|
||||
|
||||
};
|
||||
@ -393,7 +597,7 @@ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
|
||||
}
|
||||
}
|
||||
|
||||
public static void createShuffleHandlerFiles(File logDir, String user,
|
||||
private static void createShuffleHandlerFiles(File logDir, String user,
|
||||
String appId, String appAttemptId, Configuration conf,
|
||||
List<File> fileMap) throws IOException {
|
||||
String attemptDir =
|
||||
@ -412,8 +616,8 @@ public static void createShuffleHandlerFiles(File logDir, String user,
|
||||
createMapOutputFile(mapOutputFile, conf);
|
||||
}
|
||||
|
||||
public static void
|
||||
createMapOutputFile(File mapOutputFile, Configuration conf)
|
||||
private static void
|
||||
createMapOutputFile(File mapOutputFile, Configuration conf)
|
||||
throws IOException {
|
||||
FileOutputStream out = new FileOutputStream(mapOutputFile);
|
||||
out.write("Creating new dummy map output file. Used only for testing"
|
||||
@ -422,7 +626,7 @@ public static void createShuffleHandlerFiles(File logDir, String user,
|
||||
out.close();
|
||||
}
|
||||
|
||||
public static void createIndexFile(File indexFile, Configuration conf)
|
||||
private static void createIndexFile(File indexFile, Configuration conf)
|
||||
throws IOException {
|
||||
if (indexFile.exists()) {
|
||||
System.out.println("Deleting existing file");
|
||||
|
Loading…
Reference in New Issue
Block a user