diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 3a19047483..a784fcf090 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -83,6 +83,8 @@ Trunk (unreleased changes)
HADOOP-4515. Configuration#getBoolean must not be case sensitive. (Sho Shimauchi via harsh)
+ HADOOP-7968. Errant println left in RPC.getHighestSupportedProtocol (Sho Shimauchi via harsh)
+
BUGS
HADOOP-7851. Configuration.getClasses() never returns the default value.
@@ -203,6 +205,8 @@ Release 0.23.1 - Unreleased
HADOOP-7348. Change 'addnl' in getmerge util to be a flag '-nl' instead.
(XieXianshan via harsh)
+ HADOOP-7975. Add LZ4 as an entry in the default codec list, missed by HADOOP-7657 (harsh)
+
OPTIMIZATIONS
BUG FIXES
@@ -269,6 +273,9 @@ Release 0.23.1 - Unreleased
HADOOP-7964. Deadlock in NetUtils and SecurityUtil class initialization.
(Daryn Sharp via suresh)
+ HADOOP-7974. TestViewFsTrash incorrectly determines the user's home
+ directory. (harsh via eli)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index cf59746868..321c1d8ef3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -791,7 +791,10 @@ VerProtocolImpl getHighestSupportedProtocol(RpcKind rpcKind,
String protocolName) {
Long highestVersion = 0L;
ProtoClassProtoImpl highest = null;
- System.out.println("Size of protoMap for " + rpcKind + " =" + getProtocolImplMap(rpcKind).size());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Size of protoMap for " + rpcKind + " ="
+ + getProtocolImplMap(rpcKind).size());
+ }
for (Map.Entry pv :
getProtocolImplMap(rpcKind).entrySet()) {
if (pv.getKey().protocol.equals(protocolName)) {
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 9763845786..9cf1eaf311 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -161,7 +161,7 @@
io.compression.codecs
- org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec
+ org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.Lz4CodecA list of the compression codec classes that can be used
for compression/decompression.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsTrash.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsTrash.java
index 9239f2f1f8..7795c3f5f0 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsTrash.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsTrash.java
@@ -78,17 +78,16 @@ public void setUp() throws Exception {
// set up viewfs's home dir root to point to home dir root on target
// But home dir is different on linux, mac etc.
// Figure it out by calling home dir on target
-
- String homeDir = fsTarget.getHomeDirectory().toUri().getPath();
- int indexOf2ndSlash = homeDir.indexOf('/', 1);
- String homeDirRoot = homeDir.substring(0, indexOf2ndSlash);
- ConfigUtil.addLink(conf, homeDirRoot,
- fsTarget.makeQualified(new Path(homeDirRoot)).toUri());
- ConfigUtil.setHomeDirConf(conf, homeDirRoot);
- Log.info("Home dir base " + homeDirRoot);
-
+
+ String homeDirRoot = fsTarget.getHomeDirectory()
+ .getParent().toUri().getPath();
+ ConfigUtil.addLink(conf, homeDirRoot,
+ fsTarget.makeQualified(new Path(homeDirRoot)).toUri());
+ ConfigUtil.setHomeDirConf(conf, homeDirRoot);
+ Log.info("Home dir base " + homeDirRoot);
+
fsView = ViewFileSystemTestSetup.setupForViewFs(conf, fsTarget);
-
+
// set working dir so that relative paths
//fsView.setWorkingDirectory(new Path(fsTarget.getWorkingDirectory().toUri().getPath()));
conf.set("fs.defaultFS", FsConstants.VIEWFS_URI.toString());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 2b33c120a9..82bc6877d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -264,6 +264,8 @@ Release 0.23.1 - UNRELEASED
HDFS-69. Improve the 'dfsadmin' commandline help. (harsh)
+ HDFS-2788. HdfsServerConstants#DN_KEEPALIVE_TIMEOUT is dead code (eli)
+
OPTIMIZATIONS
HDFS-2130. Switch default checksum to CRC32C. (todd)
@@ -327,6 +329,9 @@ Release 0.23.1 - UNRELEASED
HDFS-2707. HttpFS should read the hadoop-auth secret from a file
instead inline from the configuration. (tucu)
+ HDFS-2790. FSNamesystem.setTimes throws exception with wrong
+ configuration name in the message. (Arpit Gupta via eli)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
index 256e5d663e..a457e5e880 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
@@ -86,7 +86,6 @@ public String getClusterId() {
public static int READ_TIMEOUT_EXTENSION = 5 * 1000;
public static int WRITE_TIMEOUT = 8 * 60 * 1000;
public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline
- public static int DN_KEEPALIVE_TIMEOUT = 5 * 1000;
/**
* Defines the NameNode role.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index f1664f7e62..8edfb7fcbd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1219,7 +1219,7 @@ void setTimes(String src, long mtime, long atime)
throws IOException, UnresolvedLinkException {
if (!isAccessTimeSupported() && atime != -1) {
throw new IOException("Access time for hdfs is not configured. " +
- " Please set dfs.support.accessTime configuration parameter.");
+ " Please set " + DFS_NAMENODE_ACCESSTIME_PRECISION_KEY + " configuration parameter.");
}
writeLock();
try {
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index ea08bb1682..f18bbb201f 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -481,6 +481,24 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3625. CapacityScheduler web-ui display of queue's used capacity is broken.
(Jason Lowe via mahadev)
+ MAPREDUCE-3596. Fix scheduler to handle cleaned up containers, which NMs
+ may subsequently report as running. (Vinod Kumar Vavilapalli via sseth)
+
+ MAPREDUCE-3656. Fixed a race condition in MR AM which is failing the sort
+ benchmark consistently. (Siddarth Seth via vinodkv)
+
+ MAPREDUCE-3532. Modified NM to report correct http address when an ephemeral
+ web port is configured. (Bhallamudi Venkata Siva Kamesh via vinodkv)
+
+ MAPREDUCE-3404. Corrected MR AM to honor speculative configuration and enable
+ speculating either maps or reduces. (Eric Payne via vinodkv)
+
+ MAPREDUCE-3649. Job End notification gives an error on calling back.
+ (Ravi Prakash via mahadev)
+
+ MAPREDUCE-3657. State machine visualize build fails. (Jason Lowe
+ via mahadev)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
index 0b4ea94e27..6d78a6a8c0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
@@ -22,7 +22,9 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -77,6 +79,9 @@ public class TaskAttemptListenerImpl extends CompositeService
private ConcurrentMap
jvmIDToActiveAttemptMap
= new ConcurrentHashMap();
+ private Set launchedJVMs = Collections
+ .newSetFromMap(new ConcurrentHashMap());
+
private JobTokenSecretManager jobTokenSecretManager = null;
public TaskAttemptListenerImpl(AppContext context,
@@ -412,22 +417,28 @@ public JvmTask getTask(JvmContext context) throws IOException {
// Try to look up the task. We remove it directly as we don't give
// multiple tasks to a JVM
- org.apache.hadoop.mapred.Task task = jvmIDToActiveAttemptMap
- .remove(wJvmID);
- if (task != null) {
- LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
- jvmTask = new JvmTask(task, false);
-
- // remove the task as it is no more needed and free up the memory
- // Also we have already told the JVM to process a task, so it is no
- // longer pending, and further request should ask it to exit.
- } else {
+ if (!jvmIDToActiveAttemptMap.containsKey(wJvmID)) {
LOG.info("JVM with ID: " + jvmId + " is invalid and will be killed.");
jvmTask = TASK_FOR_INVALID_JVM;
+ } else {
+ if (!launchedJVMs.contains(wJvmID)) {
+ jvmTask = null;
+ LOG.info("JVM with ID: " + jvmId
+ + " asking for task before AM launch registered. Given null task");
+ } else {
+ // remove the task as it is no more needed and free up the memory.
+ // Also we have already told the JVM to process a task, so it is no
+ // longer pending, and further request should ask it to exit.
+ org.apache.hadoop.mapred.Task task =
+ jvmIDToActiveAttemptMap.remove(wJvmID);
+ launchedJVMs.remove(wJvmID);
+ LOG.info("JVM with ID: " + jvmId + " given task: " + task.getTaskID());
+ jvmTask = new JvmTask(task, false);
+ }
}
return jvmTask;
}
-
+
@Override
public void registerPendingTask(
org.apache.hadoop.mapred.Task task, WrappedJvmID jvmID) {
@@ -440,13 +451,12 @@ public void registerPendingTask(
@Override
public void registerLaunchedTask(
- org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID) {
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID,
+ WrappedJvmID jvmId) {
+ // The AM considers the task to be launched (Has asked the NM to launch it)
+ // The JVM will only be given a task after this registartion.
+ launchedJVMs.add(jvmId);
- // The task is launched. Register this for expiry-tracking.
-
- // Timing can cause this to happen after the real JVM launches and gets a
- // task which is still fine as we will only be tracking for expiry a little
- // late than usual.
taskHeartbeatHandler.register(attemptID);
}
@@ -459,7 +469,12 @@ public void unregister(
// registration. Events are ordered at TaskAttempt, so unregistration will
// always come after registration.
- // remove the mapping if not already removed
+ // Remove from launchedJVMs before jvmIDToActiveAttemptMap to avoid
+ // synchronization issue with getTask(). getTask should be checking
+ // jvmIDToActiveAttemptMap before it checks launchedJVMs.
+
+ // remove the mappings if not already removed
+ launchedJVMs.remove(jvmID);
jvmIDToActiveAttemptMap.remove(jvmID);
//unregister this attempt
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java
index ae92cc0a3e..9ffe2181c9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java
@@ -19,12 +19,11 @@
package org.apache.hadoop.mapreduce.v2.app;
import java.io.IOException;
-import java.io.InputStream;
+import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLConnection;
import java.net.Proxy;
+import java.net.URL;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
@@ -40,7 +39,8 @@
* User can specify number of retry attempts and a time interval at which to
* attempt retries
* Cluster administrators can set final parameters to set maximum number of
- * tries (0 would disable job end notification) and max time interval
+ * tries (0 would disable job end notification) and max time interval and a
+ * proxy if needed
* The URL may contain sentinels which will be replaced by jobId and jobStatus
* (eg. SUCCEEDED/KILLED/FAILED)
*
@@ -59,8 +59,8 @@ public class JobEndNotifier implements Configurable {
/**
* Parse the URL that needs to be notified of the end of the job, along
- * with the number of retries in case of failure and the amount of time to
- * wait between retries
+ * with the number of retries in case of failure, the amount of time to
+ * wait between retries and proxy settings
* @param conf the configuration
*/
public void setConf(Configuration conf) {
@@ -119,15 +119,19 @@ protected boolean notifyURLOnce() {
boolean success = false;
try {
Log.info("Job end notification trying " + urlToNotify);
- URLConnection conn = urlToNotify.openConnection(proxyToUse);
+ HttpURLConnection conn = (HttpURLConnection) urlToNotify.openConnection();
conn.setConnectTimeout(5*1000);
conn.setReadTimeout(5*1000);
conn.setAllowUserInteraction(false);
- InputStream is = conn.getInputStream();
- conn.getContent();
- is.close();
- success = true;
- Log.info("Job end notification to " + urlToNotify + " succeeded");
+ if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+ Log.warn("Job end notification to " + urlToNotify +" failed with code: "
+ + conn.getResponseCode() + " and message \"" + conn.getResponseMessage()
+ +"\"");
+ }
+ else {
+ success = true;
+ Log.info("Job end notification to " + urlToNotify + " succeeded");
+ }
} catch(IOException ioe) {
Log.warn("Job end notification to " + urlToNotify + " failed", ioe);
}
@@ -135,8 +139,8 @@ protected boolean notifyURLOnce() {
}
/**
- * Notify a server of the completion of a submitted job. The server must have
- * configured MRConfig.JOB_END_NOTIFICATION_URLS
+ * Notify a server of the completion of a submitted job. The user must have
+ * configured MRJobConfig.MR_JOB_END_NOTIFICATION_URL
* @param jobReport JobReport used to read JobId and JobStatus
* @throws InterruptedException
*/
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 8ba241ec02..6097e377d1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -258,7 +258,7 @@ public void init(final Configuration conf) {
dispatcher.register(TaskAttemptEventType.class,
new TaskAttemptEventDispatcher());
dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
-
+
if (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
|| conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
//optional service to speculate on task attempts' progress
@@ -881,9 +881,31 @@ public SpeculatorEventDispatcher(Configuration config) {
}
@Override
public void handle(SpeculatorEvent event) {
- if (!disabled &&
- (conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
- || conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false))) {
+ if (disabled) {
+ return;
+ }
+
+ TaskId tId = event.getTaskID();
+ TaskType tType = null;
+ /* event's TaskId will be null if the event type is JOB_CREATE or
+ * ATTEMPT_STATUS_UPDATE
+ */
+ if (tId != null) {
+ tType = tId.getTaskType();
+ }
+ boolean shouldMapSpec =
+ conf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+ boolean shouldReduceSpec =
+ conf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+
+ /* The point of the following is to allow the MAP and REDUCE speculative
+ * config values to be independent:
+ * IF spec-exec is turned on for maps AND the task is a map task
+ * OR IF spec-exec is turned on for reduces AND the task is a reduce task
+ * THEN call the speculator to handle the event.
+ */
+ if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP))
+ || (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) {
// Speculator IS enabled, direct the event to there.
speculator.handle(event);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
index 7002e69d52..1d2a0a4061 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptListener.java
@@ -45,8 +45,9 @@ public interface TaskAttemptListener {
*
* @param attemptID
* the id of the attempt for this JVM.
+ * @param jvmID the ID of the JVM.
*/
- void registerLaunchedTask(TaskAttemptId attemptID);
+ void registerLaunchedTask(TaskAttemptId attemptID, WrappedJvmID jvmID);
/**
* Unregister the JVM and the attempt associated with it. This should be
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
index a9e8be6258..b827a2cdf3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
@@ -93,6 +93,7 @@ public void stop() {
public void receivedPing(TaskAttemptId attemptID) {
//only put for the registered attempts
+ //TODO throw an exception if the task isn't registered.
runningAttempts.replace(attemptID, clock.getTime());
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index d9ed1e53f8..b296d02d55 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -1201,7 +1201,7 @@ public void transition(TaskAttemptImpl taskAttempt,
// register it to TaskAttemptListener so that it can start monitoring it.
taskAttempt.taskAttemptListener
- .registerLaunchedTask(taskAttempt.attemptId);
+ .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID);
//TODO Resolve to host / IP in case of a local address.
InetSocketAddress nodeHttpInetAddr =
NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
index 47a0e55f58..8737864e41 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
@@ -19,6 +19,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -79,21 +80,21 @@ public void testGetTask() throws IOException {
assertNotNull(result);
assertTrue(result.shouldDie);
- // Verify ask after registration but before launch
+ // Verify ask after registration but before launch.
+ // Don't kill, should be null.
TaskAttemptId attemptID = mock(TaskAttemptId.class);
Task task = mock(Task.class);
//Now put a task with the ID
listener.registerPendingTask(task, wid);
result = listener.getTask(context);
- assertNotNull(result);
- assertFalse(result.shouldDie);
+ assertNull(result);
// Unregister for more testing.
listener.unregister(attemptID, wid);
// Verify ask after registration and launch
//Now put a task with the ID
listener.registerPendingTask(task, wid);
- listener.registerLaunchedTask(attemptID);
+ listener.registerLaunchedTask(attemptID, wid);
verify(hbHandler).register(attemptID);
result = listener.getTask(context);
assertNotNull(result);
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
index f17bf6f8af..3eb214d79c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
@@ -324,7 +324,9 @@ public InetSocketAddress getAddress() {
return NetUtils.createSocketAddr("localhost:54321");
}
@Override
- public void registerLaunchedTask(TaskAttemptId attemptID) {}
+ public void registerLaunchedTask(TaskAttemptId attemptID,
+ WrappedJvmID jvmID) {
+ }
@Override
public void unregister(TaskAttemptId attemptID, WrappedJvmID jvmID) {
}
@@ -463,6 +465,7 @@ protected StateMachine getStateMachine() {
return localStateMachine;
}
+ @SuppressWarnings("rawtypes")
public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener, Clock clock,
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java
new file mode 100644
index 0000000000..12bb5ac0e7
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecution.java
@@ -0,0 +1,309 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSpeculativeExecution {
+
+ /*
+ * This class is used to control when speculative execution happens.
+ */
+ public static class TestSpecEstimator extends LegacyTaskRuntimeEstimator {
+ private static final long SPECULATE_THIS = 999999L;
+
+ public TestSpecEstimator() {
+ super();
+ }
+
+ /*
+ * This will only be called if speculative execution is turned on.
+ *
+ * If either mapper or reducer speculation is turned on, this will be
+ * called.
+ *
+ * This will cause speculation to engage for the first mapper or first
+ * reducer (that is, attempt ID "*_m_000000_0" or "*_r_000000_0")
+ *
+ * If this attempt is killed, the retry will have attempt id 1, so it
+ * will not engage speculation again.
+ */
+ @Override
+ public long estimatedRuntime(TaskAttemptId id) {
+ if ((id.getTaskId().getId() == 0) && (id.getId() == 0)) {
+ return SPECULATE_THIS;
+ }
+ return super.estimatedRuntime(id);
+ }
+ }
+
+ private static final Log LOG = LogFactory.getLog(TestSpeculativeExecution.class);
+
+ protected static MiniMRYarnCluster mrCluster;
+
+ private static Configuration initialConf = new Configuration();
+ private static FileSystem localFs;
+ static {
+ try {
+ localFs = FileSystem.getLocal(initialConf);
+ } catch (IOException io) {
+ throw new RuntimeException("problem getting local fs", io);
+ }
+ }
+
+ private static Path TEST_ROOT_DIR =
+ new Path("target",TestSpeculativeExecution.class.getName() + "-tmpDir")
+ .makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
+ static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
+ private static Path TEST_OUT_DIR = new Path(TEST_ROOT_DIR, "test.out.dir");
+
+ @BeforeClass
+ public static void setup() throws IOException {
+
+ if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
+ LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ + " not found. Not running test.");
+ return;
+ }
+
+ if (mrCluster == null) {
+ mrCluster = new MiniMRYarnCluster(TestSpeculativeExecution.class.getName(), 4);
+ Configuration conf = new Configuration();
+ mrCluster.init(conf);
+ mrCluster.start();
+ }
+
+ // workaround the absent public distcache.
+ localFs.copyFromLocalFile(new Path(MiniMRYarnCluster.APPJAR), APP_JAR);
+ localFs.setPermission(APP_JAR, new FsPermission("700"));
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (mrCluster != null) {
+ mrCluster.stop();
+ mrCluster = null;
+ }
+ }
+
+ public static class SpeculativeMapper extends
+ Mapper