diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 75dca8f632..8b6581ccd0 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -285,6 +285,9 @@ Release 2.1.0-beta - UNRELEASED
MAPREDUCE-5199. Removing ApplicationTokens file as it is no longer needed.
(Daryn Sharp via vinodkv)
+ MAPREDUCE-5192. Allow for alternate resolutions of TaskCompletionEvents.
+ (cdouglas via acmurthy)
+
OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
diff --git a/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml b/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
index e4c1001a71..d33fa6b52a 100644
--- a/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
+++ b/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
@@ -271,12 +271,19 @@
+
+
+
+
+
-
+
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
index acc85645f4..313e9f5f32 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.mapreduce.task.reduce;
import java.io.IOException;
-import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -37,11 +36,9 @@ class EventFetcher extends Thread {
private final TaskUmbilicalProtocol umbilical;
private final ShuffleScheduler scheduler;
private int fromEventIdx = 0;
- private int maxEventsToFetch;
- private ExceptionReporter exceptionReporter = null;
+ private final int maxEventsToFetch;
+ private final ExceptionReporter exceptionReporter;
- private int maxMapRuntime = 0;
-
private volatile boolean stopped = false;
public EventFetcher(TaskAttemptID reduce,
@@ -113,7 +110,8 @@ public void shutDown() {
* from a given event ID.
* @throws IOException
*/
- protected int getMapCompletionEvents() throws IOException {
+ protected int getMapCompletionEvents()
+ throws IOException, InterruptedException {
int numNewMaps = 0;
TaskCompletionEvent events[] = null;
@@ -129,14 +127,7 @@ protected int getMapCompletionEvents() throws IOException {
LOG.debug("Got " + events.length + " map completion events from " +
fromEventIdx);
- // Check if the reset is required.
- // Since there is no ordering of the task completion events at the
- // reducer, the only option to sync with the new jobtracker is to reset
- // the events index
- if (update.shouldReset()) {
- fromEventIdx = 0;
- scheduler.resetKnownMaps();
- }
+ assert !update.shouldReset() : "Unexpected legacy state";
// Update the last seen event ID
fromEventIdx += events.length;
@@ -148,49 +139,14 @@ protected int getMapCompletionEvents() throws IOException {
// 3. Remove TIPFAILED maps from neededOutputs since we don't need their
// outputs at all.
for (TaskCompletionEvent event : events) {
- switch (event.getTaskStatus()) {
- case SUCCEEDED:
- URI u = getBaseURI(event.getTaskTrackerHttp());
- scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(),
- u.toString(),
- event.getTaskAttemptId());
- numNewMaps ++;
- int duration = event.getTaskRunTime();
- if (duration > maxMapRuntime) {
- maxMapRuntime = duration;
- scheduler.informMaxMapRunTime(maxMapRuntime);
- }
- break;
- case FAILED:
- case KILLED:
- case OBSOLETE:
- scheduler.obsoleteMapOutput(event.getTaskAttemptId());
- LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
- " map-task: '" + event.getTaskAttemptId() + "'");
- break;
- case TIPFAILED:
- scheduler.tipFailed(event.getTaskAttemptId().getTaskID());
- LOG.info("Ignoring output of failed map TIP: '" +
- event.getTaskAttemptId() + "'");
- break;
+ scheduler.resolve(event);
+ if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {
+ ++numNewMaps;
}
}
} while (events.length == maxEventsToFetch);
return numNewMaps;
}
-
- private URI getBaseURI(String url) {
- StringBuffer baseUrl = new StringBuffer(url);
- if (!url.endsWith("/")) {
- baseUrl.append("/");
- }
- baseUrl.append("mapOutput?job=");
- baseUrl.append(reduce.getJobID());
- baseUrl.append("&reduce=");
- baseUrl.append(reduce.getTaskID().getId());
- baseUrl.append("&map=");
- URI u = URI.create(baseUrl.toString());
- return u;
- }
-}
\ No newline at end of file
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index 15e3d94398..06e518015f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -72,7 +72,7 @@ private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
private final Counters.Counter wrongMapErrs;
private final Counters.Counter wrongReduceErrs;
private final MergeManager merger;
- private final ShuffleScheduler scheduler;
+ private final ShuffleSchedulerImpl scheduler;
private final ShuffleClientMetrics metrics;
private final ExceptionReporter exceptionReporter;
private final int id;
@@ -90,7 +90,7 @@ private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
private static SSLFactory sslFactory;
public Fetcher(JobConf job, TaskAttemptID reduceId,
- ShuffleScheduler scheduler, MergeManager merger,
+ ShuffleSchedulerImpl scheduler, MergeManager merger,
Reporter reporter, ShuffleClientMetrics metrics,
ExceptionReporter exceptionReporter, SecretKey shuffleKey) {
this.reporter = reporter;
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
index bcb88ec7f2..06c007e158 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
@@ -49,7 +49,7 @@ public class Shuffle implements ShuffleConsumerPlugin, ExceptionRepo
private ShuffleClientMetrics metrics;
private TaskUmbilicalProtocol umbilical;
- private ShuffleScheduler scheduler;
+ private ShuffleSchedulerImpl scheduler;
private MergeManager merger;
private Throwable throwable = null;
private String throwingThreadName = null;
@@ -70,8 +70,8 @@ public void init(ShuffleConsumerPlugin.Context context) {
this.taskStatus = context.getStatus();
this.reduceTask = context.getReduceTask();
- scheduler = new ShuffleScheduler(jobConf, taskStatus, this,
- copyPhase, context.getShuffledMapsCounter(),
+ scheduler = new ShuffleSchedulerImpl(jobConf, taskStatus, reduceId,
+ this, copyPhase, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
merger = createMergeManager(context);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
index 1eb8cb10dc..95c750930a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
@@ -18,432 +18,30 @@
package org.apache.hadoop.mapreduce.task.reduce;
import java.io.IOException;
-import java.text.DecimalFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TaskStatus;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.task.reduce.MapHost.State;
-import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
-class ShuffleScheduler {
- static ThreadLocal shuffleStart = new ThreadLocal() {
- protected Long initialValue() {
- return 0L;
- }
- };
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface ShuffleScheduler {
- private static final Log LOG = LogFactory.getLog(ShuffleScheduler.class);
- private static final int MAX_MAPS_AT_ONCE = 20;
- private static final long INITIAL_PENALTY = 10000;
- private static final float PENALTY_GROWTH_RATE = 1.3f;
- private final static int REPORT_FAILURE_LIMIT = 10;
-
- private final boolean[] finishedMaps;
- private final int totalMaps;
- private int remainingMaps;
- private Map mapLocations = new HashMap();
- private Set pendingHosts = new HashSet();
- private Set obsoleteMaps = new HashSet();
-
- private final Random random = new Random(System.currentTimeMillis());
- private final DelayQueue penalties = new DelayQueue();
- private final Referee referee = new Referee();
- private final Map failureCounts =
- new HashMap();
- private final Map hostFailures =
- new HashMap();
- private final TaskStatus status;
- private final ExceptionReporter reporter;
- private final int abortFailureLimit;
- private final Progress progress;
- private final Counters.Counter shuffledMapsCounter;
- private final Counters.Counter reduceShuffleBytes;
- private final Counters.Counter failedShuffleCounter;
-
- private final long startTime;
- private long lastProgressTime;
-
- private int maxMapRuntime = 0;
- private int maxFailedUniqueFetches = 5;
- private int maxFetchFailuresBeforeReporting;
-
- private long totalBytesShuffledTillNow = 0;
- private DecimalFormat mbpsFormat = new DecimalFormat("0.00");
-
- private boolean reportReadErrorImmediately = true;
- private long maxDelay = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY;
-
- public ShuffleScheduler(JobConf job, TaskStatus status,
- ExceptionReporter reporter,
- Progress progress,
- Counters.Counter shuffledMapsCounter,
- Counters.Counter reduceShuffleBytes,
- Counters.Counter failedShuffleCounter) {
- totalMaps = job.getNumMapTasks();
- abortFailureLimit = Math.max(30, totalMaps / 10);
- remainingMaps = totalMaps;
- finishedMaps = new boolean[remainingMaps];
- this.reporter = reporter;
- this.status = status;
- this.progress = progress;
- this.shuffledMapsCounter = shuffledMapsCounter;
- this.reduceShuffleBytes = reduceShuffleBytes;
- this.failedShuffleCounter = failedShuffleCounter;
- this.startTime = System.currentTimeMillis();
- lastProgressTime = startTime;
- referee.start();
- this.maxFailedUniqueFetches = Math.min(totalMaps,
- this.maxFailedUniqueFetches);
- this.maxFetchFailuresBeforeReporting = job.getInt(
- MRJobConfig.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT);
- this.reportReadErrorImmediately = job.getBoolean(
- MRJobConfig.SHUFFLE_NOTIFY_READERROR, true);
-
- this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
- MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
- }
-
- public synchronized void copySucceeded(TaskAttemptID mapId,
- MapHost host,
- long bytes,
- long millis,
- MapOutput output
- ) throws IOException {
- failureCounts.remove(mapId);
- hostFailures.remove(host.getHostName());
- int mapIndex = mapId.getTaskID().getId();
-
- if (!finishedMaps[mapIndex]) {
- output.commit();
- finishedMaps[mapIndex] = true;
- shuffledMapsCounter.increment(1);
- if (--remainingMaps == 0) {
- notifyAll();
- }
-
- // update the status
- totalBytesShuffledTillNow += bytes;
- updateStatus();
- reduceShuffleBytes.increment(bytes);
- lastProgressTime = System.currentTimeMillis();
- LOG.debug("map " + mapId + " done " + status.getStateString());
- }
- }
-
- private void updateStatus() {
- float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
- int mapsDone = totalMaps - remainingMaps;
- long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
-
- float transferRate = mbs / secsSinceStart;
- progress.set((float) mapsDone / totalMaps);
- String statusString = mapsDone + " / " + totalMaps + " copied.";
- status.setStateString(statusString);
-
- progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
- + mbpsFormat.format(transferRate) + " MB/s)");
- }
-
- public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
- boolean readError, boolean connectExcpt) {
- host.penalize();
- int failures = 1;
- if (failureCounts.containsKey(mapId)) {
- IntWritable x = failureCounts.get(mapId);
- x.set(x.get() + 1);
- failures = x.get();
- } else {
- failureCounts.put(mapId, new IntWritable(1));
- }
- String hostname = host.getHostName();
- if (hostFailures.containsKey(hostname)) {
- IntWritable x = hostFailures.get(hostname);
- x.set(x.get() + 1);
- } else {
- hostFailures.put(hostname, new IntWritable(1));
- }
- if (failures >= abortFailureLimit) {
- try {
- throw new IOException(failures + " failures downloading " + mapId);
- } catch (IOException ie) {
- reporter.reportException(ie);
- }
- }
-
- checkAndInformJobTracker(failures, mapId, readError, connectExcpt);
-
- checkReducerHealth();
-
- long delay = (long) (INITIAL_PENALTY *
- Math.pow(PENALTY_GROWTH_RATE, failures));
- if (delay > maxDelay) {
- delay = maxDelay;
- }
-
- penalties.add(new Penalty(host, delay));
-
- failedShuffleCounter.increment(1);
- }
-
- // Notify the JobTracker
- // after every read error, if 'reportReadErrorImmediately' is true or
- // after every 'maxFetchFailuresBeforeReporting' failures
- private void checkAndInformJobTracker(
- int failures, TaskAttemptID mapId, boolean readError,
- boolean connectExcpt) {
- if (connectExcpt || (reportReadErrorImmediately && readError)
- || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
- LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
- status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId);
- }
- }
-
- private void checkReducerHealth() {
- final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
- final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
- final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
-
- long totalFailures = failedShuffleCounter.getValue();
- int doneMaps = totalMaps - remainingMaps;
-
- boolean reducerHealthy =
- (((float)totalFailures / (totalFailures + doneMaps))
- < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
-
- // check if the reducer has progressed enough
- boolean reducerProgressedEnough =
- (((float)doneMaps / totalMaps)
- >= MIN_REQUIRED_PROGRESS_PERCENT);
-
- // check if the reducer is stalled for a long time
- // duration for which the reducer is stalled
- int stallDuration =
- (int)(System.currentTimeMillis() - lastProgressTime);
-
- // duration for which the reducer ran with progress
- int shuffleProgressDuration =
- (int)(lastProgressTime - startTime);
-
- // min time the reducer should run without getting killed
- int minShuffleRunDuration =
- (shuffleProgressDuration > maxMapRuntime)
- ? shuffleProgressDuration
- : maxMapRuntime;
-
- boolean reducerStalled =
- (((float)stallDuration / minShuffleRunDuration)
- >= MAX_ALLOWED_STALL_TIME_PERCENT);
-
- // kill if not healthy and has insufficient progress
- if ((failureCounts.size() >= maxFailedUniqueFetches ||
- failureCounts.size() == (totalMaps - doneMaps))
- && !reducerHealthy
- && (!reducerProgressedEnough || reducerStalled)) {
- LOG.fatal("Shuffle failed with too many fetch failures " +
- "and insufficient progress!");
- String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
- reporter.reportException(new IOException(errorMsg));
- }
-
- }
-
- public synchronized void tipFailed(TaskID taskId) {
- if (!finishedMaps[taskId.getId()]) {
- finishedMaps[taskId.getId()] = true;
- if (--remainingMaps == 0) {
- notifyAll();
- }
- updateStatus();
- }
- }
-
- public synchronized void addKnownMapOutput(String hostName,
- String hostUrl,
- TaskAttemptID mapId) {
- MapHost host = mapLocations.get(hostName);
- if (host == null) {
- host = new MapHost(hostName, hostUrl);
- mapLocations.put(hostName, host);
- }
- host.addKnownMap(mapId);
-
- // Mark the host as pending
- if (host.getState() == State.PENDING) {
- pendingHosts.add(host);
- notifyAll();
- }
- }
-
- public synchronized void obsoleteMapOutput(TaskAttemptID mapId) {
- obsoleteMaps.add(mapId);
- }
-
- public synchronized void putBackKnownMapOutput(MapHost host,
- TaskAttemptID mapId) {
- host.addKnownMap(mapId);
- }
-
- public synchronized MapHost getHost() throws InterruptedException {
- while(pendingHosts.isEmpty()) {
- wait();
- }
-
- MapHost host = null;
- Iterator iter = pendingHosts.iterator();
- int numToPick = random.nextInt(pendingHosts.size());
- for (int i=0; i <= numToPick; ++i) {
- host = iter.next();
- }
-
- pendingHosts.remove(host);
- host.markBusy();
-
- LOG.info("Assiging " + host + " with " + host.getNumKnownMapOutputs() +
- " to " + Thread.currentThread().getName());
- shuffleStart.set(System.currentTimeMillis());
-
- return host;
- }
-
- public synchronized List getMapsForHost(MapHost host) {
- List list = host.getAndClearKnownMaps();
- Iterator itr = list.iterator();
- List result = new ArrayList();
- int includedMaps = 0;
- int totalSize = list.size();
- // find the maps that we still need, up to the limit
- while (itr.hasNext()) {
- TaskAttemptID id = itr.next();
- if (!obsoleteMaps.contains(id) && !finishedMaps[id.getTaskID().getId()]) {
- result.add(id);
- if (++includedMaps >= MAX_MAPS_AT_ONCE) {
- break;
- }
- }
- }
- // put back the maps left after the limit
- while (itr.hasNext()) {
- TaskAttemptID id = itr.next();
- if (!obsoleteMaps.contains(id) && !finishedMaps[id.getTaskID().getId()]) {
- host.addKnownMap(id);
- }
- }
- LOG.info("assigned " + includedMaps + " of " + totalSize + " to " +
- host + " to " + Thread.currentThread().getName());
- return result;
- }
-
- public synchronized void freeHost(MapHost host) {
- if (host.getState() != State.PENALIZED) {
- if (host.markAvailable() == State.PENDING) {
- pendingHosts.add(host);
- notifyAll();
- }
- }
- LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " +
- (System.currentTimeMillis()-shuffleStart.get()) + "ms");
- }
-
- public synchronized void resetKnownMaps() {
- mapLocations.clear();
- obsoleteMaps.clear();
- pendingHosts.clear();
- }
-
/**
* Wait until the shuffle finishes or until the timeout.
* @param millis maximum wait time
* @return true if the shuffle is done
* @throws InterruptedException
*/
- public synchronized boolean waitUntilDone(int millis
- ) throws InterruptedException {
- if (remainingMaps > 0) {
- wait(millis);
- return remainingMaps == 0;
- }
- return true;
- }
-
+ public boolean waitUntilDone(int millis) throws InterruptedException;
+
/**
- * A structure that records the penalty for a host.
+ * Interpret a {@link TaskCompletionEvent} from the event stream.
+ * @param tce Intermediate output metadata
*/
- private static class Penalty implements Delayed {
- MapHost host;
- private long endTime;
-
- Penalty(MapHost host, long delay) {
- this.host = host;
- this.endTime = System.currentTimeMillis() + delay;
- }
+ public void resolve(TaskCompletionEvent tce)
+ throws IOException, InterruptedException;
- public long getDelay(TimeUnit unit) {
- long remainingTime = endTime - System.currentTimeMillis();
- return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
- }
+ public void close() throws InterruptedException;
- public int compareTo(Delayed o) {
- long other = ((Penalty) o).endTime;
- return endTime == other ? 0 : (endTime < other ? -1 : 1);
- }
-
- }
-
- /**
- * A thread that takes hosts off of the penalty list when the timer expires.
- */
- private class Referee extends Thread {
- public Referee() {
- setName("ShufflePenaltyReferee");
- setDaemon(true);
- }
-
- public void run() {
- try {
- while (true) {
- // take the first host that has an expired penalty
- MapHost host = penalties.take().host;
- synchronized (ShuffleScheduler.this) {
- if (host.markAvailable() == MapHost.State.PENDING) {
- pendingHosts.add(host);
- ShuffleScheduler.this.notifyAll();
- }
- }
- }
- } catch (InterruptedException ie) {
- return;
- } catch (Throwable t) {
- reporter.reportException(t);
- }
- }
- }
-
- public void close() throws InterruptedException {
- referee.interrupt();
- referee.join();
- }
-
- public synchronized void informMaxMapRunTime(int duration) {
- if (duration > maxMapRuntime) {
- maxMapRuntime = duration;
- }
- }
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
new file mode 100644
index 0000000000..76affb234b
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.task.reduce;
+
+import java.io.IOException;
+
+import java.net.URI;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.task.reduce.MapHost.State;
+import org.apache.hadoop.util.Progress;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ShuffleSchedulerImpl implements ShuffleScheduler {
+ static ThreadLocal shuffleStart = new ThreadLocal() {
+ protected Long initialValue() {
+ return 0L;
+ }
+ };
+
+ private static final Log LOG = LogFactory.getLog(ShuffleSchedulerImpl.class);
+ private static final int MAX_MAPS_AT_ONCE = 20;
+ private static final long INITIAL_PENALTY = 10000;
+ private static final float PENALTY_GROWTH_RATE = 1.3f;
+ private final static int REPORT_FAILURE_LIMIT = 10;
+
+ private final boolean[] finishedMaps;
+
+ private final int totalMaps;
+ private int remainingMaps;
+ private Map mapLocations = new HashMap();
+ private Set pendingHosts = new HashSet();
+ private Set obsoleteMaps = new HashSet();
+
+ private final TaskAttemptID reduceId;
+ private final Random random = new Random();
+ private final DelayQueue penalties = new DelayQueue();
+ private final Referee referee = new Referee();
+ private final Map failureCounts =
+ new HashMap();
+ private final Map hostFailures =
+ new HashMap();
+ private final TaskStatus status;
+ private final ExceptionReporter reporter;
+ private final int abortFailureLimit;
+ private final Progress progress;
+ private final Counters.Counter shuffledMapsCounter;
+ private final Counters.Counter reduceShuffleBytes;
+ private final Counters.Counter failedShuffleCounter;
+
+ private final long startTime;
+ private long lastProgressTime;
+
+ private volatile int maxMapRuntime = 0;
+ private final int maxFailedUniqueFetches;
+ private final int maxFetchFailuresBeforeReporting;
+
+ private long totalBytesShuffledTillNow = 0;
+ private final DecimalFormat mbpsFormat = new DecimalFormat("0.00");
+
+ private final boolean reportReadErrorImmediately;
+ private long maxDelay = MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY;
+
+ public ShuffleSchedulerImpl(JobConf job, TaskStatus status,
+ TaskAttemptID reduceId,
+ ExceptionReporter reporter,
+ Progress progress,
+ Counters.Counter shuffledMapsCounter,
+ Counters.Counter reduceShuffleBytes,
+ Counters.Counter failedShuffleCounter) {
+ totalMaps = job.getNumMapTasks();
+ abortFailureLimit = Math.max(30, totalMaps / 10);
+
+ remainingMaps = totalMaps;
+ finishedMaps = new boolean[remainingMaps];
+ this.reporter = reporter;
+ this.status = status;
+ this.reduceId = reduceId;
+ this.progress = progress;
+ this.shuffledMapsCounter = shuffledMapsCounter;
+ this.reduceShuffleBytes = reduceShuffleBytes;
+ this.failedShuffleCounter = failedShuffleCounter;
+ this.startTime = System.currentTimeMillis();
+ lastProgressTime = startTime;
+ referee.start();
+ this.maxFailedUniqueFetches = Math.min(totalMaps, 5);
+ this.maxFetchFailuresBeforeReporting = job.getInt(
+ MRJobConfig.SHUFFLE_FETCH_FAILURES, REPORT_FAILURE_LIMIT);
+ this.reportReadErrorImmediately = job.getBoolean(
+ MRJobConfig.SHUFFLE_NOTIFY_READERROR, true);
+
+ this.maxDelay = job.getLong(MRJobConfig.MAX_SHUFFLE_FETCH_RETRY_DELAY,
+ MRJobConfig.DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY);
+ }
+
+ @Override
+ public void resolve(TaskCompletionEvent event) {
+ switch (event.getTaskStatus()) {
+ case SUCCEEDED:
+ URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());
+ addKnownMapOutput(u.getHost() + ":" + u.getPort(),
+ u.toString(),
+ event.getTaskAttemptId());
+ maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());
+ break;
+ case FAILED:
+ case KILLED:
+ case OBSOLETE:
+ obsoleteMapOutput(event.getTaskAttemptId());
+ LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
+ " map-task: '" + event.getTaskAttemptId() + "'");
+ break;
+ case TIPFAILED:
+ tipFailed(event.getTaskAttemptId().getTaskID());
+ LOG.info("Ignoring output of failed map TIP: '" +
+ event.getTaskAttemptId() + "'");
+ break;
+ }
+ }
+
+ static URI getBaseURI(TaskAttemptID reduceId, String url) {
+ StringBuffer baseUrl = new StringBuffer(url);
+ if (!url.endsWith("/")) {
+ baseUrl.append("/");
+ }
+ baseUrl.append("mapOutput?job=");
+ baseUrl.append(reduceId.getJobID());
+ baseUrl.append("&reduce=");
+ baseUrl.append(reduceId.getTaskID().getId());
+ baseUrl.append("&map=");
+ URI u = URI.create(baseUrl.toString());
+ return u;
+ }
+
+ public synchronized void copySucceeded(TaskAttemptID mapId,
+ MapHost host,
+ long bytes,
+ long millis,
+ MapOutput output
+ ) throws IOException {
+ failureCounts.remove(mapId);
+ hostFailures.remove(host.getHostName());
+ int mapIndex = mapId.getTaskID().getId();
+
+ if (!finishedMaps[mapIndex]) {
+ output.commit();
+ finishedMaps[mapIndex] = true;
+ shuffledMapsCounter.increment(1);
+ if (--remainingMaps == 0) {
+ notifyAll();
+ }
+
+ // update the status
+ totalBytesShuffledTillNow += bytes;
+ updateStatus();
+ reduceShuffleBytes.increment(bytes);
+ lastProgressTime = System.currentTimeMillis();
+ LOG.debug("map " + mapId + " done " + status.getStateString());
+ }
+ }
+
+ private void updateStatus() {
+ float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
+ int mapsDone = totalMaps - remainingMaps;
+ long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
+
+ float transferRate = mbs / secsSinceStart;
+ progress.set((float) mapsDone / totalMaps);
+ String statusString = mapsDone + " / " + totalMaps + " copied.";
+ status.setStateString(statusString);
+
+ progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
+ + mbpsFormat.format(transferRate) + " MB/s)");
+ }
+
+ public synchronized void copyFailed(TaskAttemptID mapId, MapHost host,
+ boolean readError, boolean connectExcpt) {
+ host.penalize();
+ int failures = 1;
+ if (failureCounts.containsKey(mapId)) {
+ IntWritable x = failureCounts.get(mapId);
+ x.set(x.get() + 1);
+ failures = x.get();
+ } else {
+ failureCounts.put(mapId, new IntWritable(1));
+ }
+ String hostname = host.getHostName();
+ if (hostFailures.containsKey(hostname)) {
+ IntWritable x = hostFailures.get(hostname);
+ x.set(x.get() + 1);
+ } else {
+ hostFailures.put(hostname, new IntWritable(1));
+ }
+ if (failures >= abortFailureLimit) {
+ try {
+ throw new IOException(failures + " failures downloading " + mapId);
+ } catch (IOException ie) {
+ reporter.reportException(ie);
+ }
+ }
+
+ checkAndInformJobTracker(failures, mapId, readError, connectExcpt);
+
+ checkReducerHealth();
+
+ long delay = (long) (INITIAL_PENALTY *
+ Math.pow(PENALTY_GROWTH_RATE, failures));
+ if (delay > maxDelay) {
+ delay = maxDelay;
+ }
+
+ penalties.add(new Penalty(host, delay));
+
+ failedShuffleCounter.increment(1);
+ }
+
+ // Notify the JobTracker
+ // after every read error, if 'reportReadErrorImmediately' is true or
+ // after every 'maxFetchFailuresBeforeReporting' failures
+ private void checkAndInformJobTracker(
+ int failures, TaskAttemptID mapId, boolean readError,
+ boolean connectExcpt) {
+ if (connectExcpt || (reportReadErrorImmediately && readError)
+ || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
+ LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
+ status.addFetchFailedMap((org.apache.hadoop.mapred.TaskAttemptID) mapId);
+ }
+ }
+
+ private void checkReducerHealth() {
+ final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
+ final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
+ final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
+
+ long totalFailures = failedShuffleCounter.getValue();
+ int doneMaps = totalMaps - remainingMaps;
+
+ boolean reducerHealthy =
+ (((float)totalFailures / (totalFailures + doneMaps))
+ < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
+
+ // check if the reducer has progressed enough
+ boolean reducerProgressedEnough =
+ (((float)doneMaps / totalMaps)
+ >= MIN_REQUIRED_PROGRESS_PERCENT);
+
+ // check if the reducer is stalled for a long time
+ // duration for which the reducer is stalled
+ int stallDuration =
+ (int)(System.currentTimeMillis() - lastProgressTime);
+
+ // duration for which the reducer ran with progress
+ int shuffleProgressDuration =
+ (int)(lastProgressTime - startTime);
+
+ // min time the reducer should run without getting killed
+ int minShuffleRunDuration =
+ Math.max(shuffleProgressDuration, maxMapRuntime);
+
+ boolean reducerStalled =
+ (((float)stallDuration / minShuffleRunDuration)
+ >= MAX_ALLOWED_STALL_TIME_PERCENT);
+
+ // kill if not healthy and has insufficient progress
+ if ((failureCounts.size() >= maxFailedUniqueFetches ||
+ failureCounts.size() == (totalMaps - doneMaps))
+ && !reducerHealthy
+ && (!reducerProgressedEnough || reducerStalled)) {
+ LOG.fatal("Shuffle failed with too many fetch failures " +
+ "and insufficient progress!");
+ String errorMsg = "Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.";
+ reporter.reportException(new IOException(errorMsg));
+ }
+
+ }
+
+ public synchronized void tipFailed(TaskID taskId) {
+ if (!finishedMaps[taskId.getId()]) {
+ finishedMaps[taskId.getId()] = true;
+ if (--remainingMaps == 0) {
+ notifyAll();
+ }
+ updateStatus();
+ }
+ }
+
+ public synchronized void addKnownMapOutput(String hostName,
+ String hostUrl,
+ TaskAttemptID mapId) {
+ MapHost host = mapLocations.get(hostName);
+ if (host == null) {
+ host = new MapHost(hostName, hostUrl);
+ mapLocations.put(hostName, host);
+ }
+ host.addKnownMap(mapId);
+
+ // Mark the host as pending
+ if (host.getState() == State.PENDING) {
+ pendingHosts.add(host);
+ notifyAll();
+ }
+ }
+
+
+ public synchronized void obsoleteMapOutput(TaskAttemptID mapId) {
+ obsoleteMaps.add(mapId);
+ }
+
+ public synchronized void putBackKnownMapOutput(MapHost host,
+ TaskAttemptID mapId) {
+ host.addKnownMap(mapId);
+ }
+
+
+ public synchronized MapHost getHost() throws InterruptedException {
+ while(pendingHosts.isEmpty()) {
+ wait();
+ }
+
+ MapHost host = null;
+ Iterator iter = pendingHosts.iterator();
+ int numToPick = random.nextInt(pendingHosts.size());
+ for (int i=0; i <= numToPick; ++i) {
+ host = iter.next();
+ }
+
+ pendingHosts.remove(host);
+ host.markBusy();
+
+ LOG.info("Assigning " + host + " with " + host.getNumKnownMapOutputs() +
+ " to " + Thread.currentThread().getName());
+ shuffleStart.set(System.currentTimeMillis());
+
+ return host;
+ }
+
+ public synchronized List getMapsForHost(MapHost host) {
+ List list = host.getAndClearKnownMaps();
+ Iterator itr = list.iterator();
+ List result = new ArrayList();
+ int includedMaps = 0;
+ int totalSize = list.size();
+ // find the maps that we still need, up to the limit
+ while (itr.hasNext()) {
+ TaskAttemptID id = itr.next();
+ if (!obsoleteMaps.contains(id) && !finishedMaps[id.getTaskID().getId()]) {
+ result.add(id);
+ if (++includedMaps >= MAX_MAPS_AT_ONCE) {
+ break;
+ }
+ }
+ }
+ // put back the maps left after the limit
+ while (itr.hasNext()) {
+ TaskAttemptID id = itr.next();
+ if (!obsoleteMaps.contains(id) && !finishedMaps[id.getTaskID().getId()]) {
+ host.addKnownMap(id);
+ }
+ }
+ LOG.info("assigned " + includedMaps + " of " + totalSize + " to " +
+ host + " to " + Thread.currentThread().getName());
+ return result;
+ }
+
+ public synchronized void freeHost(MapHost host) {
+ if (host.getState() != State.PENALIZED) {
+ if (host.markAvailable() == State.PENDING) {
+ pendingHosts.add(host);
+ notifyAll();
+ }
+ }
+ LOG.info(host + " freed by " + Thread.currentThread().getName() + " in " +
+ (System.currentTimeMillis()-shuffleStart.get()) + "ms");
+ }
+
+ public synchronized void resetKnownMaps() {
+ mapLocations.clear();
+ obsoleteMaps.clear();
+ pendingHosts.clear();
+ }
+
+ /**
+ * Wait until the shuffle finishes or until the timeout.
+ * @param millis maximum wait time
+ * @return true if the shuffle is done
+ * @throws InterruptedException
+ */
+ @Override
+ public synchronized boolean waitUntilDone(int millis
+ ) throws InterruptedException {
+ if (remainingMaps > 0) {
+ wait(millis);
+ return remainingMaps == 0;
+ }
+ return true;
+ }
+
+ /**
+ * A structure that records the penalty for a host.
+ */
+ private static class Penalty implements Delayed {
+ MapHost host;
+ private long endTime;
+
+ Penalty(MapHost host, long delay) {
+ this.host = host;
+ this.endTime = System.currentTimeMillis() + delay;
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ long remainingTime = endTime - System.currentTimeMillis();
+ return unit.convert(remainingTime, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ long other = ((Penalty) o).endTime;
+ return endTime == other ? 0 : (endTime < other ? -1 : 1);
+ }
+
+ }
+
+ /**
+ * A thread that takes hosts off of the penalty list when the timer expires.
+ */
+ private class Referee extends Thread {
+ public Referee() {
+ setName("ShufflePenaltyReferee");
+ setDaemon(true);
+ }
+
+ public void run() {
+ try {
+ while (true) {
+ // take the first host that has an expired penalty
+ MapHost host = penalties.take().host;
+ synchronized (ShuffleSchedulerImpl.this) {
+ if (host.markAvailable() == MapHost.State.PENDING) {
+ pendingHosts.add(host);
+ ShuffleSchedulerImpl.this.notifyAll();
+ }
+ }
+ }
+ } catch (InterruptedException ie) {
+ return;
+ } catch (Throwable t) {
+ reporter.reportException(t);
+ }
+ }
+ }
+
+ @Override
+ public void close() throws InterruptedException {
+ referee.interrupt();
+ referee.join();
+ }
+
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java
index 84ac656cf9..1a0bfc751d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java
@@ -43,7 +43,8 @@
public class TestEventFetcher {
@Test
- public void testConsecutiveFetch() throws IOException {
+ public void testConsecutiveFetch()
+ throws IOException, InterruptedException {
final int MAX_EVENTS_TO_FETCH = 100;
TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 1);
@@ -63,7 +64,8 @@ public void testConsecutiveFetch() throws IOException {
.thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH*2, 3));
@SuppressWarnings("unchecked")
- ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
+ ShuffleScheduler scheduler =
+ mock(ShuffleScheduler.class);
ExceptionReporter reporter = mock(ExceptionReporter.class);
EventFetcherForTest ef =
@@ -79,8 +81,8 @@ public void testConsecutiveFetch() throws IOException {
eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid));
inOrder.verify(umbilical).getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH*2), eq(MAX_EVENTS_TO_FETCH), eq(tid));
- verify(scheduler, times(MAX_EVENTS_TO_FETCH*2 + 3)).addKnownMapOutput(
- anyString(), anyString(), any(TaskAttemptID.class));
+ verify(scheduler, times(MAX_EVENTS_TO_FETCH*2 + 3)).resolve(
+ any(TaskCompletionEvent.class));
}
private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
@@ -108,7 +110,8 @@ public EventFetcherForTest(TaskAttemptID reduce,
}
@Override
- public int getMapCompletionEvents() throws IOException {
+ public int getMapCompletionEvents()
+ throws IOException, InterruptedException {
return super.getMapCompletionEvents();
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
index 570bdadd92..8a37399fe6 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
@@ -56,9 +56,10 @@ public static class FakeFetcher extends Fetcher {
private HttpURLConnection connection;
public FakeFetcher(JobConf job, TaskAttemptID reduceId,
- ShuffleScheduler scheduler, MergeManagerImpl merger, Reporter reporter,
- ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter,
- SecretKey jobTokenSecret, HttpURLConnection connection) {
+ ShuffleSchedulerImpl scheduler, MergeManagerImpl merger,
+ Reporter reporter, ShuffleClientMetrics metrics,
+ ExceptionReporter exceptionReporter, SecretKey jobTokenSecret,
+ HttpURLConnection connection) {
super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter,
jobTokenSecret);
this.connection = connection;
@@ -79,7 +80,7 @@ public void testCopyFromHostConnectionTimeout() throws Exception {
LOG.info("testCopyFromHostConnectionTimeout");
JobConf job = new JobConf();
TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
- ShuffleScheduler ss = mock(ShuffleScheduler.class);
+ ShuffleSchedulerImpl ss = mock(ShuffleSchedulerImpl.class);
MergeManagerImpl mm = mock(MergeManagerImpl.class);
Reporter r = mock(Reporter.class);
ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
@@ -127,7 +128,7 @@ public void testCopyFromHostBogusHeader() throws Exception {
LOG.info("testCopyFromHostBogusHeader");
JobConf job = new JobConf();
TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
- ShuffleScheduler ss = mock(ShuffleScheduler.class);
+ ShuffleSchedulerImpl ss = mock(ShuffleSchedulerImpl.class);
MergeManagerImpl mm = mock(MergeManagerImpl.class);
Reporter r = mock(Reporter.class);
ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
@@ -182,7 +183,7 @@ public void testCopyFromHostWait() throws Exception {
LOG.info("testCopyFromHostWait");
JobConf job = new JobConf();
TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
- ShuffleScheduler ss = mock(ShuffleScheduler.class);
+ ShuffleSchedulerImpl ss = mock(ShuffleSchedulerImpl.class);
MergeManagerImpl mm = mock(MergeManagerImpl.class);
Reporter r = mock(Reporter.class);
ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
@@ -240,7 +241,7 @@ public void testCopyFromHostExtraBytes() throws Exception {
LOG.info("testCopyFromHostWaitExtraBytes");
JobConf job = new JobConf();
TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
- ShuffleScheduler ss = mock(ShuffleScheduler.class);
+ ShuffleSchedulerImpl ss = mock(ShuffleSchedulerImpl.class);
MergeManagerImpl mm = mock(MergeManagerImpl.class);
InMemoryMapOutput immo = mock(InMemoryMapOutput.class);
@@ -256,7 +257,6 @@ public void testCopyFromHostExtraBytes() throws Exception {
Fetcher underTest = new FakeFetcher(job, id, ss, mm,
r, metrics, except, key, connection);
-
MapHost host = new MapHost("localhost", "http://localhost:8080/");
@@ -315,7 +315,7 @@ public void testCopyFromHostCompressFailure() throws Exception {
LOG.info("testCopyFromHostCompressFailure");
JobConf job = new JobConf();
TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
- ShuffleScheduler ss = mock(ShuffleScheduler.class);
+ ShuffleSchedulerImpl ss = mock(ShuffleSchedulerImpl.class);
MergeManagerImpl mm = mock(MergeManagerImpl.class);
InMemoryMapOutput immo = mock(InMemoryMapOutput.class);
Reporter r = mock(Reporter.class);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
index f4ed330406..355a419426 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
@@ -47,8 +47,10 @@ public void addFetchFailedMap(TaskAttemptID mapTaskId) {
};
Progress progress = new Progress();
- ShuffleScheduler scheduler = new ShuffleScheduler(job, status, null,
- progress, null, null, null);
+ TaskAttemptID reduceId = new TaskAttemptID("314159", 0, TaskType.REDUCE,
+ 0, 0);
+ ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, status,
+ reduceId, null, progress, null, null, null);
JobID jobId = new JobID();
TaskID taskId1 = new TaskID(jobId, TaskType.REDUCE, 1);