diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index be2f84f017..c6889cb3bd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -65,6 +65,11 @@ class Fetcher extends Thread { /* Default read timeout (in milliseconds) */ private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000; + // This should be kept in sync with ShuffleHandler.FETCH_RETRY_DELAY. + private static final long FETCH_RETRY_DELAY_DEFAULT = 1000L; + static final int TOO_MANY_REQ_STATUS_CODE = 429; + private static final String FETCH_RETRY_AFTER_HEADER = "Retry-After"; + protected final Reporter reporter; private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP, CONNECTION, WRONG_REDUCE} @@ -269,6 +274,13 @@ class Fetcher extends Thread { } else { input = new DataInputStream(connection.getInputStream()); } + } catch (TryAgainLaterException te) { + LOG.warn("Connection rejected by the host " + te.host + + ". Will retry later."); + scheduler.penalize(host, te.backoff); + for (TaskAttemptID left : remaining) { + scheduler.putBackKnownMapOutput(host, left); + } } catch (IOException ie) { boolean connectExcpt = ie instanceof ConnectException; ioErrs.increment(1); @@ -427,6 +439,19 @@ class Fetcher extends Thread { throws IOException { // Validate response code int rc = connection.getResponseCode(); + // See if the shuffleHandler rejected the connection due to too many + // reducer requests. If so, signal fetchers to back off. + if (rc == TOO_MANY_REQ_STATUS_CODE) { + long backoff = connection.getHeaderFieldLong(FETCH_RETRY_AFTER_HEADER, + FETCH_RETRY_DELAY_DEFAULT); + // in case we get a negative backoff from ShuffleHandler + if (backoff < 0) { + backoff = FETCH_RETRY_DELAY_DEFAULT; + LOG.warn("Get a negative backoff value from ShuffleHandler. Setting" + + " it to the default value " + FETCH_RETRY_DELAY_DEFAULT); + } + throw new TryAgainLaterException(backoff, url.getHost()); + } if (rc != HttpURLConnection.HTTP_OK) { throw new IOException( "Got invalid response code " + rc + " from " + url + @@ -728,4 +753,15 @@ class Fetcher extends Thread { } } } + + private static class TryAgainLaterException extends IOException { + public final long backoff; + public final String host; + + public TryAgainLaterException(long backoff, String host) { + super("Too many requests to a map host"); + this.backoff = backoff; + this.host = host; + } + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java index 935931dcef..dfb28de3c5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MapHost.java @@ -75,10 +75,6 @@ public class MapHost { state = State.BUSY; } - public synchronized void markPenalized() { - state = State.PENALIZED; - } - public synchronized int getNumKnownMapOutputs() { return maps.size(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java index c0d7e0f063..a819771610 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java @@ -35,6 +35,7 @@ import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -105,7 +106,7 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { private final DecimalFormat mbpsFormat = new DecimalFormat("0.00"); private final boolean reportReadErrorImmediately; - private long maxDelay = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY; + private long maxPenalty = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY; private int maxHostFailures; public ShuffleSchedulerImpl(JobConf job, TaskStatus status, @@ -136,7 +137,7 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { this.reportReadErrorImmediately = job.getBoolean( MRJobConfig.SHUFFLE_NOTIFY_READERROR, true); - this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY, + this.maxPenalty = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY, MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY); this.maxHostFailures = job.getInt( MRJobConfig.MAX_SHUFFLE_FETCH_HOST_FAILURES, @@ -252,9 +253,26 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { } } + @VisibleForTesting + synchronized int hostFailureCount(String hostname) { + int failures = 0; + if (hostFailures.containsKey(hostname)) { + failures = hostFailures.get(hostname).get(); + } + return failures; + } + + @VisibleForTesting + synchronized int fetchFailureCount(TaskAttemptID mapId) { + int failures = 0; + if (failureCounts.containsKey(mapId)) { + failures = failureCounts.get(mapId).get(); + } + return failures; + } + public synchronized void copyFailed(TaskAttemptID mapId, MapHost host, boolean readError, boolean connectExcpt) { - host.penalize(); int failures = 1; if (failureCounts.containsKey(mapId)) { IntWritable x = failureCounts.get(mapId); @@ -290,15 +308,22 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { long delay = (long) (INITIAL_PENALTY * Math.pow(PENALTY_GROWTH_RATE, failures)); - if (delay > maxDelay) { - delay = maxDelay; - } - - penalties.add(new Penalty(host, delay)); + penalize(host, Math.min(delay, maxPenalty)); failedShuffleCounter.increment(1); } - + + /** + * Ask the shuffle scheduler to penalize a given host for a given amount + * of time before it reassigns a new fetcher to fetch from the host. + * @param host The host to penalize. + * @param delay The time to wait for before retrying + */ + void penalize(MapHost host, long delay) { + host.penalize(); + penalties.add(new Penalty(host, delay)); + } + public void reportLocalError(IOException ioe) { try { LOG.error("Shuffle failed : local error on this node: " diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java index 998b3de373..01e51e9d07 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskID; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.rules.TestName; @@ -176,6 +177,27 @@ public class TestFetcher { verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID)); verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); } + + @Test + public void testCopyFromHostConnectionRejected() throws Exception { + when(connection.getResponseCode()) + .thenReturn(Fetcher.TOO_MANY_REQ_STATUS_CODE); + + Fetcher fetcher = new FakeFetcher<>(job, id, ss, mm, r, metrics, + except, key, connection); + fetcher.copyFromHost(host); + + Assert.assertEquals("No host failure is expected.", + ss.hostFailureCount(host.getHostName()), 0); + Assert.assertEquals("No fetch failure is expected.", + ss.fetchFailureCount(map1ID), 0); + Assert.assertEquals("No fetch failure is expected.", + ss.fetchFailureCount(map2ID), 0); + + verify(ss).penalize(eq(host), anyLong()); + verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID)); + verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID)); + } @Test public void testCopyFromHostBogusHeader() throws Exception { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 558ee38548..4c1870970c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -92,7 +92,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Cont import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.fusesource.leveldbjni.JniDBFactory; import org.fusesource.leveldbjni.internal.NativeDB; import org.iq80.leveldb.DB; @@ -166,6 +165,12 @@ public class ShuffleHandler extends AuxiliaryService { private static final String DATA_FILE_NAME = "file.out"; private static final String INDEX_FILE_NAME = "file.out.index"; + public static final HttpResponseStatus TOO_MANY_REQ_STATUS = + new HttpResponseStatus(429, "TOO MANY REQUESTS"); + // This should kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT + public static final long FETCH_RETRY_DELAY = 1000L; + public static final String RETRY_AFTER_HEADER = "Retry-After"; + private int port; private ChannelFactory selector; private final ChannelGroup accepted = new DefaultChannelGroup(); @@ -795,7 +800,6 @@ public class ShuffleHandler extends AuxiliaryService { } class Shuffle extends SimpleChannelUpstreamHandler { - private static final int MAX_WEIGHT = 10 * 1024 * 1024; private static final int EXPIRE_AFTER_ACCESS_MINUTES = 5; private static final int ALLOWED_CONCURRENCY = 16; @@ -875,7 +879,14 @@ public class ShuffleHandler extends AuxiliaryService { LOG.info(String.format("Current number of shuffle connections (%d) is " + "greater than or equal to the max allowed shuffle connections (%d)", accepted.size(), maxShuffleConnections)); - evt.getChannel().close(); + + Map headers = new HashMap(1); + // notify fetchers to backoff for a while before closing the connection + // if the shuffle connection limit is hit. Fetchers are expected to + // handle this notification gracefully, that is, not treating this as a + // fetch failure. + headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY)); + sendError(ctx, "", TOO_MANY_REQ_STATUS, headers); return; } accepted.add(evt.getChannel()); @@ -1245,6 +1256,11 @@ public class ShuffleHandler extends AuxiliaryService { protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) { + sendError(ctx, message, status, Collections.emptyMap()); + } + + protected void sendError(ChannelHandlerContext ctx, String msg, + HttpResponseStatus status, Map headers) { HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); // Put shuffle version into http header @@ -1252,8 +1268,11 @@ public class ShuffleHandler extends AuxiliaryService { ShuffleHeader.DEFAULT_HTTP_HEADER_NAME); response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION); + for (Map.Entry header : headers.entrySet()) { + response.headers().set(header.getKey(), header.getValue()); + } response.setContent( - ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8)); + ChannelBuffers.copiedBuffer(msg, CharsetUtil.UTF_8)); // Close the connection as soon as the error message is sent. ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java index 1717588468..a927bf4f5e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java @@ -36,7 +36,6 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.net.HttpURLConnection; -import java.net.SocketException; import java.net.URL; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -80,7 +79,6 @@ import org.apache.hadoop.yarn.server.records.Version; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.socket.SocketChannel; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.AbstractChannel; @@ -609,13 +607,20 @@ public class TestShuffleHandler { // This connection should be closed because it to above the limit try { - conns[2].getInputStream(); rc = conns[2].getResponseCode(); - Assert.fail("Expected a SocketException"); - } catch (SocketException se) { + Assert.assertEquals("Expected a too-many-requests response code", + ShuffleHandler.TOO_MANY_REQ_STATUS.getCode(), rc); + long backoff = Long.valueOf( + conns[2].getHeaderField(ShuffleHandler.RETRY_AFTER_HEADER)); + Assert.assertTrue("The backoff value cannot be negative.", backoff > 0); + conns[2].getInputStream(); + Assert.fail("Expected an IOException"); + } catch (IOException ioe) { LOG.info("Expected - connection should not be open"); + } catch (NumberFormatException ne) { + Assert.fail("Expected a numerical value for RETRY_AFTER header field"); } catch (Exception e) { - Assert.fail("Expected a SocketException"); + Assert.fail("Expected a IOException"); } shuffleHandler.stop();