diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f4acf9d16e..a3a0c9fabf 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -683,6 +683,9 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4797. LocalContainerAllocator can loop forever trying to contact the RM (jlowe via bobby) + + MAPREDUCE-4801. ShuffleHandler can generate large logs due to prematurely + closed channels (jlowe via bobby) Release 0.23.4 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 61cccd36be..cb3dfadc71 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -37,6 +37,7 @@ import java.net.InetSocketAddress; import java.net.URL; import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -45,6 +46,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import javax.crypto.SecretKey; @@ -120,10 +122,16 @@ public class ShuffleHandler extends AbstractService public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes"; public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024; + // pattern to identify errors related to the client closing the socket early + // idea borrowed from Netty SslHandler + private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile( + "^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$", + Pattern.CASE_INSENSITIVE); + private int port; private ChannelFactory selector; private final ChannelGroup accepted = new DefaultChannelGroup(); - private HttpPipelineFactory pipelineFact; + protected HttpPipelineFactory pipelineFact; private int sslFileBufferSize; /** @@ -319,13 +327,17 @@ public synchronized ByteBuffer getMeta() { } } + protected Shuffle getShuffle(Configuration conf) { + return new Shuffle(conf); + } + class HttpPipelineFactory implements ChannelPipelineFactory { final Shuffle SHUFFLE; private SSLFactory sslFactory; public HttpPipelineFactory(Configuration conf) throws Exception { - SHUFFLE = new Shuffle(conf); + SHUFFLE = getShuffle(conf); if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY, MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) { sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf); @@ -465,7 +477,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) lastMap.addListener(ChannelFutureListener.CLOSE); } - private void verifyRequest(String appid, ChannelHandlerContext ctx, + protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException { SecretKey tokenSecret = secretManager.retrieveTokenSecret(appid); @@ -566,12 +578,12 @@ public void operationComplete(ChannelFuture future) { return writeFuture; } - private void sendError(ChannelHandlerContext ctx, + protected void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { sendError(ctx, "", status); } - private void sendError(ChannelHandlerContext ctx, String message, + protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) { HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); @@ -590,6 +602,16 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) if (cause instanceof TooLongFrameException) { sendError(ctx, BAD_REQUEST); return; + } else if (cause instanceof IOException) { + if (cause instanceof ClosedChannelException) { + LOG.debug("Ignoring closed channel error", cause); + return; + } + String message = String.valueOf(cause.getMessage()); + if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) { + LOG.debug("Ignoring client socket close", cause); + return; + } } LOG.error("Shuffle error: ", cause); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index c1526cc572..309a789da8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -17,17 +17,35 @@ */ package org.apache.hadoop.mapred; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertGauge; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.apache.hadoop.test.MockitoMaker.make; +import static org.apache.hadoop.test.MockitoMaker.stub; +import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; +import static org.junit.Assert.assertEquals; + +import java.io.DataInputStream; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsSource; -import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; import org.apache.hadoop.metrics2.MetricsSystem; -import static org.apache.hadoop.test.MetricsAsserts.*; - +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; - +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.http.HttpRequest; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.junit.Assert; import org.junit.Test; -import static org.junit.Assert.*; -import static org.apache.hadoop.test.MockitoMaker.*; public class TestShuffleHandler { static final long MiB = 1024 * 1024; @@ -69,4 +87,76 @@ static void checkShuffleMetrics(MetricsSystem ms, long bytes, int failed, assertCounter("ShuffleOutputsOK", succeeded, rb); assertGauge("ShuffleConnections", connections, rb); } + + @Test + public void testClientClosesConnection() throws Exception { + final ArrayList failures = new ArrayList(1); + Configuration conf = new Configuration(); + conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0); + ShuffleHandler shuffleHandler = new ShuffleHandler() { + @Override + protected Shuffle getShuffle(Configuration conf) { + // replace the shuffle handler with one stubbed for testing + return new Shuffle(conf) { + @Override + protected void verifyRequest(String appid, ChannelHandlerContext ctx, + HttpRequest request, HttpResponse response, URL requestUri) + throws IOException { + } + @Override + protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, + Channel ch, String user, String jobId, String mapId, int reduce) + throws IOException { + // send a shuffle header and a lot of data down the channel + // to trigger a broken pipe + ShuffleHeader header = + new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1); + DataOutputBuffer dob = new DataOutputBuffer(); + header.write(dob); + ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + dob = new DataOutputBuffer(); + for (int i=0; i<100000; ++i) { + header.write(dob); + } + return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength())); + } + @Override + protected void sendError(ChannelHandlerContext ctx, + HttpResponseStatus status) { + if (failures.size() == 0) { + failures.add(new Error()); + ctx.getChannel().close(); + } + } + @Override + protected void sendError(ChannelHandlerContext ctx, String message, + HttpResponseStatus status) { + if (failures.size() == 0) { + failures.add(new Error()); + ctx.getChannel().close(); + } + } + }; + } + }; + shuffleHandler.init(conf); + shuffleHandler.start(); + + // simulate a reducer that closes early by reading a single shuffle header + // then closing the connection + URL url = new URL("http://127.0.0.1:" + + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY) + + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0"); + HttpURLConnection conn = (HttpURLConnection)url.openConnection(); + conn.connect(); + DataInputStream input = new DataInputStream(conn.getInputStream()); + Assert.assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode()); + ShuffleHeader header = new ShuffleHeader(); + header.readFields(input); + input.close(); + + shuffleHandler.stop(); + Assert.assertTrue("sendError called when client closed connection", + failures.size() == 0); + } }