MAPREDUCE-4801. ShuffleHandler can generate large logs due to prematurely closed channels (jlowe via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1410131 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
86ce5f6c91
commit
f13da263bf
@ -683,6 +683,9 @@ Release 0.23.5 - UNRELEASED
|
||||
|
||||
MAPREDUCE-4797. LocalContainerAllocator can loop forever trying to contact
|
||||
the RM (jlowe via bobby)
|
||||
|
||||
MAPREDUCE-4801. ShuffleHandler can generate large logs due to prematurely
|
||||
closed channels (jlowe via bobby)
|
||||
|
||||
Release 0.23.4
|
||||
|
||||
|
@ -37,6 +37,7 @@
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URL;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@ -45,6 +46,7 @@
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
@ -120,10 +122,16 @@ public class ShuffleHandler extends AbstractService
|
||||
public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
|
||||
public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
|
||||
|
||||
// pattern to identify errors related to the client closing the socket early
|
||||
// idea borrowed from Netty SslHandler
|
||||
private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
|
||||
"^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
|
||||
Pattern.CASE_INSENSITIVE);
|
||||
|
||||
private int port;
|
||||
private ChannelFactory selector;
|
||||
private final ChannelGroup accepted = new DefaultChannelGroup();
|
||||
private HttpPipelineFactory pipelineFact;
|
||||
protected HttpPipelineFactory pipelineFact;
|
||||
private int sslFileBufferSize;
|
||||
|
||||
/**
|
||||
@ -319,13 +327,17 @@ public synchronized ByteBuffer getMeta() {
|
||||
}
|
||||
}
|
||||
|
||||
protected Shuffle getShuffle(Configuration conf) {
|
||||
return new Shuffle(conf);
|
||||
}
|
||||
|
||||
class HttpPipelineFactory implements ChannelPipelineFactory {
|
||||
|
||||
final Shuffle SHUFFLE;
|
||||
private SSLFactory sslFactory;
|
||||
|
||||
public HttpPipelineFactory(Configuration conf) throws Exception {
|
||||
SHUFFLE = new Shuffle(conf);
|
||||
SHUFFLE = getShuffle(conf);
|
||||
if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
|
||||
MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
|
||||
sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
|
||||
@ -465,7 +477,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
|
||||
lastMap.addListener(ChannelFutureListener.CLOSE);
|
||||
}
|
||||
|
||||
private void verifyRequest(String appid, ChannelHandlerContext ctx,
|
||||
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
|
||||
HttpRequest request, HttpResponse response, URL requestUri)
|
||||
throws IOException {
|
||||
SecretKey tokenSecret = secretManager.retrieveTokenSecret(appid);
|
||||
@ -566,12 +578,12 @@ public void operationComplete(ChannelFuture future) {
|
||||
return writeFuture;
|
||||
}
|
||||
|
||||
private void sendError(ChannelHandlerContext ctx,
|
||||
protected void sendError(ChannelHandlerContext ctx,
|
||||
HttpResponseStatus status) {
|
||||
sendError(ctx, "", status);
|
||||
}
|
||||
|
||||
private void sendError(ChannelHandlerContext ctx, String message,
|
||||
protected void sendError(ChannelHandlerContext ctx, String message,
|
||||
HttpResponseStatus status) {
|
||||
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
|
||||
response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
|
||||
@ -590,6 +602,16 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
|
||||
if (cause instanceof TooLongFrameException) {
|
||||
sendError(ctx, BAD_REQUEST);
|
||||
return;
|
||||
} else if (cause instanceof IOException) {
|
||||
if (cause instanceof ClosedChannelException) {
|
||||
LOG.debug("Ignoring closed channel error", cause);
|
||||
return;
|
||||
}
|
||||
String message = String.valueOf(cause.getMessage());
|
||||
if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
|
||||
LOG.debug("Ignoring client socket close", cause);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
LOG.error("Shuffle error: ", cause);
|
||||
|
@ -17,17 +17,35 @@
|
||||
*/
|
||||
package org.apache.hadoop.mapred;
|
||||
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
import static org.apache.hadoop.test.MockitoMaker.make;
|
||||
import static org.apache.hadoop.test.MockitoMaker.stub;
|
||||
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.metrics2.MetricsSource;
|
||||
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.*;
|
||||
|
||||
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.handler.codec.http.HttpRequest;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.apache.hadoop.test.MockitoMaker.*;
|
||||
|
||||
public class TestShuffleHandler {
|
||||
static final long MiB = 1024 * 1024;
|
||||
@ -69,4 +87,76 @@ static void checkShuffleMetrics(MetricsSystem ms, long bytes, int failed,
|
||||
assertCounter("ShuffleOutputsOK", succeeded, rb);
|
||||
assertGauge("ShuffleConnections", connections, rb);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientClosesConnection() throws Exception {
|
||||
final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
|
||||
Configuration conf = new Configuration();
|
||||
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
|
||||
ShuffleHandler shuffleHandler = new ShuffleHandler() {
|
||||
@Override
|
||||
protected Shuffle getShuffle(Configuration conf) {
|
||||
// replace the shuffle handler with one stubbed for testing
|
||||
return new Shuffle(conf) {
|
||||
@Override
|
||||
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
|
||||
HttpRequest request, HttpResponse response, URL requestUri)
|
||||
throws IOException {
|
||||
}
|
||||
@Override
|
||||
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
|
||||
Channel ch, String user, String jobId, String mapId, int reduce)
|
||||
throws IOException {
|
||||
// send a shuffle header and a lot of data down the channel
|
||||
// to trigger a broken pipe
|
||||
ShuffleHeader header =
|
||||
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
header.write(dob);
|
||||
ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
|
||||
dob = new DataOutputBuffer();
|
||||
for (int i=0; i<100000; ++i) {
|
||||
header.write(dob);
|
||||
}
|
||||
return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
|
||||
}
|
||||
@Override
|
||||
protected void sendError(ChannelHandlerContext ctx,
|
||||
HttpResponseStatus status) {
|
||||
if (failures.size() == 0) {
|
||||
failures.add(new Error());
|
||||
ctx.getChannel().close();
|
||||
}
|
||||
}
|
||||
@Override
|
||||
protected void sendError(ChannelHandlerContext ctx, String message,
|
||||
HttpResponseStatus status) {
|
||||
if (failures.size() == 0) {
|
||||
failures.add(new Error());
|
||||
ctx.getChannel().close();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
shuffleHandler.init(conf);
|
||||
shuffleHandler.start();
|
||||
|
||||
// simulate a reducer that closes early by reading a single shuffle header
|
||||
// then closing the connection
|
||||
URL url = new URL("http://127.0.0.1:"
|
||||
+ shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
|
||||
+ "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
|
||||
HttpURLConnection conn = (HttpURLConnection)url.openConnection();
|
||||
conn.connect();
|
||||
DataInputStream input = new DataInputStream(conn.getInputStream());
|
||||
Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
|
||||
ShuffleHeader header = new ShuffleHeader();
|
||||
header.readFields(input);
|
||||
input.close();
|
||||
|
||||
shuffleHandler.stop();
|
||||
Assert.assertTrue("sendError called when client closed connection",
|
||||
failures.size() == 0);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user