diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 5e704f153f..f522d6c5d1 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -77,6 +77,10 @@ Trunk (unreleased changes) HADOOP-7899. Generate proto java files as part of the build. (tucu) + HADOOP-7574. Improve FSShell -stat, add user/group elements (XieXianshan via harsh) + + HADOOP-7348. Change 'addnl' in getmerge util to be a flag '-nl' instead (XieXianshan via harsh) + BUGS HADOOP-7851. Configuration.getClasses() never returns the default value. diff --git a/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/file_system_shell.xml b/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/file_system_shell.xml index ef4f76e5c8..6206a968a7 100644 --- a/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/file_system_shell.xml +++ b/hadoop-common-project/hadoop-common/src/main/docs/src/documentation/content/xdocs/file_system_shell.xml @@ -260,11 +260,11 @@
getmerge

- Usage: hdfs dfs -getmerge <src> <localdst> [addnl] + Usage: hdfs dfs -getmerge [-nl] <src> <localdst>

Takes a source directory and a destination file as input and concatenates files in src into the destination local file. - Optionally addnl can be set to enable adding a newline character at the end of each file. + Optionally -nl flag can be set to enable adding a newline character at the end of each file during merge.

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java index 066e5fdb89..5260d9c80d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java @@ -45,26 +45,22 @@ public static void registerCommands(CommandFactory factory) { /** merge multiple files together */ public static class Merge extends FsCommand { public static final String NAME = "getmerge"; - public static final String USAGE = " [addnl]"; + public static final String USAGE = "[-nl] "; public static final String DESCRIPTION = "Get all the files in the directories that\n" + "match the source file pattern and merge and sort them to only\n" + - "one file on local fs. is kept."; + "one file on local fs. is kept.\n" + + " -nl Add a newline character at the end of each file."; protected PathData dst = null; protected String delimiter = null; @Override protected void processOptions(LinkedList args) throws IOException { - CommandFormat cf = new CommandFormat(2, 3); + CommandFormat cf = new CommandFormat(2, 3, "nl"); cf.parse(args); - // TODO: this really should be a -nl option - if ((args.size() > 2) && Boolean.parseBoolean(args.removeLast())) { - delimiter = "\n"; - } else { - delimiter = null; - } + delimiter = cf.getOpt("nl") ? "\n" : null; dst = new PathData(new File(args.removeLast()), getConf()); } @@ -197,4 +193,4 @@ public static class CopyToLocal extends Get { public static final String USAGE = Get.USAGE; public static final String DESCRIPTION = "Identical to the -get command."; } -} \ No newline at end of file +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Stat.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Stat.java index e8a731a335..d6034390bb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Stat.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/Stat.java @@ -32,9 +32,11 @@ * Print statistics about path in specified format. * Format sequences: * %b: Size of file in blocks + * %g: Group name of owner * %n: Filename * %o: Block size * %r: replication + * %u: User name of owner * %y: UTC date as "yyyy-MM-dd HH:mm:ss" * %Y: Milliseconds since January 1, 1970 UTC */ @@ -50,8 +52,8 @@ public static void registerCommands(CommandFactory factory) { public static final String USAGE = "[format] ..."; public static final String DESCRIPTION = "Print statistics about the file/directory at \n" + - "in the specified format. Format accepts filesize in blocks (%b), filename (%n),\n" + - "block size (%o), replication (%r), modification date (%y, %Y)\n"; + "in the specified format. Format accepts filesize in blocks (%b), group name of owner(%g),\n" + + "filename (%n), block size (%o), replication (%r), user name of owner(%u), modification date (%y, %Y)\n"; protected static final SimpleDateFormat timeFmt; static { @@ -92,6 +94,9 @@ protected void processPath(PathData item) throws IOException { ? "directory" : (stat.isFile() ? "regular file" : "symlink")); break; + case 'g': + buf.append(stat.getGroup()); + break; case 'n': buf.append(item.path.getName()); break; @@ -101,6 +106,9 @@ protected void processPath(PathData item) throws IOException { case 'r': buf.append(stat.getReplication()); break; + case 'u': + buf.append(stat.getOwner()); + break; case 'y': buf.append(timeFmt.format(new Date(stat.getModificationTime()))); break; @@ -118,4 +126,4 @@ protected void processPath(PathData item) throws IOException { } out.println(buf.toString()); } -} \ No newline at end of file +} diff --git a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml index b508c304c8..b1ce87e888 100644 --- a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml +++ b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml @@ -449,7 +449,7 @@ RegexpComparator - ^-getmerge <src> <localdst> \[addnl\]:( |\t)*Get all the files in the directories that( )* + ^-getmerge \[-nl\] <src> <localdst>:( |\t)*Get all the files in the directories that( )* RegexpComparator @@ -459,6 +459,10 @@ RegexpComparator ^( |\t)*one file on local fs. <src> is kept.( )* + + RegexpComparator + ^( |\t)*-nl Add a newline character at the end of each file.( )* + @@ -606,11 +610,11 @@ RegexpComparator - ^( |\t)*in the specified format. Format accepts filesize in blocks \(%b\), filename \(%n\),( )* + ^( |\t)*in the specified format. Format accepts filesize in blocks \(%b\), group name of owner\(%g\),( )* RegexpComparator - ^( |\t)*block size \(%o\), replication \(%r\), modification date \(%y, %Y\)( )* + ^( |\t)*filename \(%n\), block size \(%o\), replication \(%r\), user name of owner\(%u\), modification date \(%y, %Y\)( )* diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c00919a4e0..1c4926f245 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -49,6 +49,8 @@ Trunk (unreleased changes) Move the support for multiple protocols to lower layer so that Writable, PB and Avro can all use it (Sanjay) + MAPREDUCE-2944. Improve checking of input for JobClient.displayTasks() (XieXianshan via harsh) + BUG FIXES MAPREDUCE-3349. Log rack-name in JobHistory for unsuccessful tasks. (Devaraj K and Amar Kamat via amarrk) @@ -175,6 +177,12 @@ Release 0.23.1 - Unreleased MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar Vavilapalli via sseth) + MAPREDUCE-3399. Modifying ContainerLocalizer to send a heartbeat to NM + immediately after downloading a resource instead of always waiting for a + second. (Siddarth Seth via vinodkv) + + MAPREDUCE-3568. Optimized Job's progress calculations in MR AM. (vinodkv) + BUG FIXES MAPREDUCE-3221. Reenabled the previously ignored test in TestSubmitJob diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 5c2e0fd0c8..8ba241ec02 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -228,7 +228,7 @@ public void init(final Configuration conf) { + recoveryEnabled + " recoverySupportedByCommitter: " + recoverySupportedByCommitter + " ApplicationAttemptID: " + appAttemptID.getAttemptId()); - dispatcher = new AsyncDispatcher(); + dispatcher = createDispatcher(); addIfService(dispatcher); } @@ -291,6 +291,10 @@ public void init(final Configuration conf) { super.init(conf); } // end of init() + protected Dispatcher createDispatcher() { + return new AsyncDispatcher(); + } + private OutputCommitter createOutputCommitter(Configuration conf) { OutputCommitter committer = null; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java index 4abcd34184..9094c77cc3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java @@ -53,6 +53,7 @@ public interface Job { int getTotalReduces(); int getCompletedMaps(); int getCompletedReduces(); + float getProgress(); boolean isUber(); String getUserName(); String getQueueName(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index cb9171cedc..183f15156c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -128,6 +128,10 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private final String username; private final OutputCommitter committer; private final Map jobACLs; + private float setupWeight = 0.05f; + private float cleanupWeight = 0.05f; + private float mapWeight = 0.0f; + private float reduceWeight = 0.0f; private final Set completedTasksFromPreviousRun; private final List amInfos; private final Lock readLock; @@ -147,7 +151,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private final long appSubmitTime; private boolean lazyTasksCopyNeeded = false; - private volatile Map tasks = new LinkedHashMap(); + volatile Map tasks = new LinkedHashMap(); private Counters jobCounters = newCounters(); // FIXME: // @@ -353,6 +357,8 @@ JobEventType.JOB_KILL, new KillTasksTransition()) private long startTime; private long finishTime; private float setupProgress; + private float mapProgress; + private float reduceProgress; private float cleanupProgress; private boolean isUber = false; @@ -587,30 +593,51 @@ public JobReport getReport() { cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber); } + computeProgress(); return MRBuilderUtils.newJobReport(jobId, jobName, username, state, appSubmitTime, startTime, finishTime, setupProgress, - computeProgress(mapTasks), computeProgress(reduceTasks), + this.mapProgress, this.reduceProgress, cleanupProgress, remoteJobConfFile.toString(), amInfos, isUber); } finally { readLock.unlock(); } } - private float computeProgress(Set taskIds) { - readLock.lock(); + @Override + public float getProgress() { + this.readLock.lock(); try { - float progress = 0; - for (TaskId taskId : taskIds) { - Task task = tasks.get(taskId); - progress += task.getProgress(); - } - int taskIdsSize = taskIds.size(); - if (taskIdsSize != 0) { - progress = progress/taskIdsSize; - } - return progress; + computeProgress(); + return (this.setupProgress * this.setupWeight + this.cleanupProgress + * this.cleanupWeight + this.mapProgress * this.mapWeight + this.reduceProgress + * this.reduceWeight); } finally { - readLock.unlock(); + this.readLock.unlock(); + } + } + + private void computeProgress() { + this.readLock.lock(); + try { + float mapProgress = 0f; + float reduceProgress = 0f; + for (Task task : this.tasks.values()) { + if (task.getType() == TaskType.MAP) { + mapProgress += task.getProgress(); + } else { + reduceProgress += task.getProgress(); + } + } + if (this.numMapTasks != 0) { + mapProgress = mapProgress / this.numMapTasks; + } + if (this.numReduceTasks != 0) { + reduceProgress = reduceProgress / this.numReduceTasks; + } + this.mapProgress = mapProgress; + this.reduceProgress = reduceProgress; + } finally { + this.readLock.unlock(); } } @@ -731,7 +758,7 @@ protected FileSystem getFileSystem(Configuration conf) throws IOException { static JobState checkJobCompleteSuccess(JobImpl job) { // check for Job success - if (job.completedTaskCount == job.getTasks().size()) { + if (job.completedTaskCount == job.tasks.size()) { try { // Commit job & do cleanup job.getCommitter().commitJob(job.getJobContext()); @@ -970,6 +997,12 @@ public JobState transition(JobImpl job, JobEvent event) { if (job.numMapTasks == 0 && job.numReduceTasks == 0) { job.addDiagnostic("No of maps and reduces are 0 " + job.jobId); + } else if (job.numMapTasks == 0) { + job.reduceWeight = 0.9f; + } else if (job.numReduceTasks == 0) { + job.mapWeight = 0.9f; + } else { + job.mapWeight = job.reduceWeight = 0.45f; } checkTaskLimits(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index b070854083..b403751154 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -376,7 +376,7 @@ public float getProgress() { try { TaskAttempt bestAttempt = selectBestAttempt(); if (bestAttempt == null) { - return 0; + return 0f; } return bestAttempt.getProgress(); } finally { @@ -457,9 +457,10 @@ private TaskAttempt selectBestAttempt() { result = at; //The first time around } // calculate the best progress - if (at.getProgress() > progress) { + float attemptProgress = at.getProgress(); + if (attemptProgress > progress) { result = at; - progress = at.getProgress(); + progress = attemptProgress; } } return result; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java index 4281e0a484..5276276c4e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java @@ -128,25 +128,7 @@ protected Job getJob() { protected float getApplicationProgress() { // For now just a single job. In future when we have a DAG, we need an // aggregate progress. - JobReport report = this.job.getReport(); - float setupWeight = 0.05f; - float cleanupWeight = 0.05f; - float mapWeight = 0.0f; - float reduceWeight = 0.0f; - int numMaps = this.job.getTotalMaps(); - int numReduces = this.job.getTotalReduces(); - if (numMaps == 0 && numReduces == 0) { - } else if (numMaps == 0) { - reduceWeight = 0.9f; - } else if (numReduces == 0) { - mapWeight = 0.9f; - } else { - mapWeight = reduceWeight = 0.45f; - } - return (report.getSetupProgress() * setupWeight - + report.getCleanupProgress() * cleanupWeight - + report.getMapProgress() * mapWeight + report.getReduceProgress() - * reduceWeight); + return this.job.getProgress(); } protected void register() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 2f25075ee8..7aa638afe6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -161,7 +161,7 @@ protected AMResponse makeRemoteRequest() throws YarnRemoteException { " finishedContainers=" + response.getCompletedContainersStatuses().size() + " resourcelimit=" + availableResources + - "knownNMs=" + clusterNmCount); + " knownNMs=" + clusterNmCount); ask.clear(); release.clear(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 561ecac8a9..11a8671707 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -115,7 +115,8 @@ public class MRApp extends MRAppMaster { applicationId.setId(0); } - public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart) { + public MRApp(int maps, int reduces, boolean autoComplete, String testName, + boolean cleanOnStart) { this(maps, reduces, autoComplete, testName, cleanOnStart, 1); } @@ -141,10 +142,17 @@ private static ContainerId getContainerId(ApplicationId applicationId, return containerId; } - public MRApp(int maps, int reduces, boolean autoComplete, String testName, + public MRApp(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) { - super(getApplicationAttemptId(applicationId, startCount), getContainerId( - applicationId, startCount), NM_HOST, NM_PORT, NM_HTTP_PORT, System + this(getApplicationAttemptId(applicationId, startCount), getContainerId( + applicationId, startCount), maps, reduces, autoComplete, testName, + cleanOnStart, startCount); + } + + public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId, + int maps, int reduces, boolean autoComplete, String testName, + boolean cleanOnStart, int startCount) { + super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, System .currentTimeMillis()); this.testWorkDir = new File("target", testName); testAbsPath = new Path(testWorkDir.getAbsolutePath()); @@ -205,9 +213,9 @@ public void waitForState(Task task, TaskState finalState) throws Exception { TaskReport report = task.getReport(); while (!finalState.equals(report.getTaskState()) && timeoutSecs++ < 20) { - System.out.println("Task State is : " + report.getTaskState() + - " Waiting for state : " + finalState + - " progress : " + report.getProgress()); + System.out.println("Task State for " + task.getID() + " is : " + + report.getTaskState() + " Waiting for state : " + finalState + + " progress : " + report.getProgress()); report = task.getReport(); Thread.sleep(500); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java index 5a67576c44..ad3e4a8712 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java @@ -425,6 +425,11 @@ public JobReport getReport() { return report; } + @Override + public float getProgress() { + return 0; + } + @Override public Counters getCounters() { return counters; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 785d8a7d03..de3909ea42 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Set; @@ -36,15 +37,20 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobId; -import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; -import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; @@ -78,6 +84,7 @@ import org.junit.After; import org.junit.Test; +@SuppressWarnings("unchecked") public class TestRMContainerAllocator { static final Log LOG = LogFactory @@ -338,98 +345,155 @@ protected ResourceScheduler createScheduler() { } } - private static class FakeJob extends JobImpl { - - public FakeJob(ApplicationAttemptId appAttemptID, Configuration conf, - int numMaps, int numReduces) { - super(MRBuilderUtils.newJobId(appAttemptID.getApplicationId(), 0), - appAttemptID, conf, null, null, null, null, null, null, null, null, - true, null, System.currentTimeMillis(), null); - this.jobId = getID(); - this.numMaps = numMaps; - this.numReduces = numReduces; - } - - private float setupProgress; - private float mapProgress; - private float reduceProgress; - private float cleanupProgress; - private final int numMaps; - private final int numReduces; - private JobId jobId; - - void setProgress(float setupProgress, float mapProgress, - float reduceProgress, float cleanupProgress) { - this.setupProgress = setupProgress; - this.mapProgress = mapProgress; - this.reduceProgress = reduceProgress; - this.cleanupProgress = cleanupProgress; - } - - @Override - public int getTotalMaps() { return this.numMaps; } - @Override - public int getTotalReduces() { return this.numReduces;} - - @Override - public JobReport getReport() { - return MRBuilderUtils.newJobReport(this.jobId, "job", "user", - JobState.RUNNING, 0, 0, 0, this.setupProgress, this.mapProgress, - this.reduceProgress, this.cleanupProgress, "jobfile", null, false); - } - } - @Test public void testReportedAppProgress() throws Exception { LOG.info("Running testReportedAppProgress"); Configuration conf = new Configuration(); - MyResourceManager rm = new MyResourceManager(conf); + final MyResourceManager rm = new MyResourceManager(conf); rm.start(); - DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + DrainDispatcher rmDispatcher = (DrainDispatcher) rm.getRMContext() .getDispatcher(); // Submit the application - RMApp app = rm.submitApp(1024); - dispatcher.await(); + RMApp rmApp = rm.submitApp(1024); + rmDispatcher.await(); - MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + MockNM amNodeManager = rm.registerNode("amNM:1234", 21504); amNodeManager.nodeHeartbeat(true); - dispatcher.await(); + rmDispatcher.await(); - ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + final ApplicationAttemptId appAttemptId = rmApp.getCurrentAppAttempt() .getAppAttemptId(); rm.sendAMLaunched(appAttemptId); - dispatcher.await(); + rmDispatcher.await(); - FakeJob job = new FakeJob(appAttemptId, conf, 2, 2); - MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, - appAttemptId, job); + MRApp mrApp = new MRApp(appAttemptId, BuilderUtils.newContainerId( + appAttemptId, 0), 10, 10, false, this.getClass().getName(), true, 1) { + @Override + protected Dispatcher createDispatcher() { + return new DrainDispatcher(); + } + protected ContainerAllocator createContainerAllocator( + ClientService clientService, AppContext context) { + return new MyContainerAllocator(rm, appAttemptId, context); + }; + }; + + Assert.assertEquals(0.0, rmApp.getProgress(), 0.0); + + mrApp.submit(conf); + Job job = mrApp.getContext().getAllJobs().entrySet().iterator().next() + .getValue(); + + DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher(); + + MyContainerAllocator allocator = (MyContainerAllocator) mrApp + .getContainerAllocator(); + + mrApp.waitForState(job, JobState.RUNNING); + + amDispatcher.await(); + // Wait till all map-attempts request for containers + for (Task t : job.getTasks().values()) { + if (t.getType() == TaskType.MAP) { + mrApp.waitForState(t.getAttempts().values().iterator().next(), + TaskAttemptState.UNASSIGNED); + } + } + amDispatcher.await(); + + allocator.schedule(); + rmDispatcher.await(); + amNodeManager.nodeHeartbeat(true); + rmDispatcher.await(); + allocator.schedule(); + rmDispatcher.await(); + + // Wait for all map-tasks to be running + for (Task t : job.getTasks().values()) { + if (t.getType() == TaskType.MAP) { + mrApp.waitForState(t, TaskState.RUNNING); + } + } allocator.schedule(); // Send heartbeat - dispatcher.await(); - Assert.assertEquals(0.0, app.getProgress(), 0.0); + rmDispatcher.await(); + Assert.assertEquals(0.05f, job.getProgress(), 0.001f); + Assert.assertEquals(0.05f, rmApp.getProgress(), 0.001f); - job.setProgress(100, 10, 0, 0); + // Finish off 1 map. + Iterator it = job.getTasks().values().iterator(); + finishNextNTasks(mrApp, it, 1); allocator.schedule(); - dispatcher.await(); - Assert.assertEquals(9.5f, app.getProgress(), 0.0); + rmDispatcher.await(); + Assert.assertEquals(0.095f, job.getProgress(), 0.001f); + Assert.assertEquals(0.095f, rmApp.getProgress(), 0.001f); - job.setProgress(100, 80, 0, 0); + // Finish off 7 more so that map-progress is 80% + finishNextNTasks(mrApp, it, 7); allocator.schedule(); - dispatcher.await(); - Assert.assertEquals(41.0f, app.getProgress(), 0.0); + rmDispatcher.await(); + Assert.assertEquals(0.41f, job.getProgress(), 0.001f); + Assert.assertEquals(0.41f, rmApp.getProgress(), 0.001f); - job.setProgress(100, 100, 20, 0); - allocator.schedule(); - dispatcher.await(); - Assert.assertEquals(59.0f, app.getProgress(), 0.0); + // Finish off the 2 remaining maps + finishNextNTasks(mrApp, it, 2); + + // Wait till all reduce-attempts request for containers + for (Task t : job.getTasks().values()) { + if (t.getType() == TaskType.REDUCE) { + mrApp.waitForState(t.getAttempts().values().iterator().next(), + TaskAttemptState.UNASSIGNED); + } + } - job.setProgress(100, 100, 100, 100); allocator.schedule(); - dispatcher.await(); - Assert.assertEquals(100.0f, app.getProgress(), 0.0); + rmDispatcher.await(); + amNodeManager.nodeHeartbeat(true); + rmDispatcher.await(); + allocator.schedule(); + rmDispatcher.await(); + + // Wait for all reduce-tasks to be running + for (Task t : job.getTasks().values()) { + if (t.getType() == TaskType.REDUCE) { + mrApp.waitForState(t, TaskState.RUNNING); + } + } + + // Finish off 2 reduces + finishNextNTasks(mrApp, it, 2); + + allocator.schedule(); + rmDispatcher.await(); + Assert.assertEquals(0.59f, job.getProgress(), 0.001f); + Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f); + + // Finish off the remaining 8 reduces. + finishNextNTasks(mrApp, it, 8); + allocator.schedule(); + rmDispatcher.await(); + // Remaining is JobCleanup + Assert.assertEquals(0.95f, job.getProgress(), 0.001f); + Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); + } + + private void finishNextNTasks(MRApp mrApp, Iterator it, int nextN) + throws Exception { + Task task; + for (int i=0; i it = job.getTasks().values().iterator(); - job.setProgress(100, 60, 0, 0); + // Finish off 1 map so that map-progress is 10% + finishNextNTasks(mrApp, it, 1); allocator.schedule(); - dispatcher.await(); - Assert.assertEquals(59.0f, app.getProgress(), 0.0); + rmDispatcher.await(); + Assert.assertEquals(0.14f, job.getProgress(), 0.001f); + Assert.assertEquals(0.14f, rmApp.getProgress(), 0.001f); - job.setProgress(100, 100, 0, 100); + // Finish off 5 more map so that map-progress is 60% + finishNextNTasks(mrApp, it, 5); allocator.schedule(); - dispatcher.await(); - Assert.assertEquals(100.0f, app.getProgress(), 0.0); + rmDispatcher.await(); + Assert.assertEquals(0.59f, job.getProgress(), 0.001f); + Assert.assertEquals(0.59f, rmApp.getProgress(), 0.001f); + + // Finish off remaining map so that map-progress is 100% + finishNextNTasks(mrApp, it, 4); + allocator.schedule(); + rmDispatcher.await(); + Assert.assertEquals(0.95f, job.getProgress(), 0.001f); + Assert.assertEquals(0.95f, rmApp.getProgress(), 0.001f); } @Test @@ -1000,7 +1114,6 @@ private static class MyContainerAllocator extends RMContainerAllocator { private MyResourceManager rm; - @SuppressWarnings("rawtypes") private static AppContext createAppContext( ApplicationAttemptId appAttemptId, Job job) { AppContext context = mock(AppContext.class); @@ -1028,7 +1141,15 @@ private static ClientService createMockClientService() { return service; } - MyContainerAllocator(MyResourceManager rm, Configuration conf, + // Use this constructor when using a real job. + MyContainerAllocator(MyResourceManager rm, + ApplicationAttemptId appAttemptId, AppContext context) { + super(createMockClientService(), context); + this.rm = rm; + } + + // Use this constructor when you are using a mocked job. + public MyContainerAllocator(MyResourceManager rm, Configuration conf, ApplicationAttemptId appAttemptId, Job job) { super(createMockClientService(), createAppContext(appAttemptId, job)); this.rm = rm; @@ -1090,6 +1211,7 @@ public List schedule() { return result; } + @Override protected void startAllocatorThread() { // override to NOT start thread } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java index 303c488d70..c697a1967f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java @@ -393,6 +393,11 @@ public JobReport getReport() { throw new UnsupportedOperationException("Not supported yet."); } + @Override + public float getProgress() { + return 0; + } + @Override public Counters getCounters() { throw new UnsupportedOperationException("Not supported yet."); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 9bc18f920f..a142c085fa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -65,7 +65,7 @@ public void testJobNoTasksTransition() { Task mockTask = mock(Task.class); Map tasks = new HashMap(); tasks.put(mockTask.getID(), mockTask); - when(mockJob.getTasks()).thenReturn(tasks); + mockJob.tasks = tasks; when(mockJob.getState()).thenReturn(JobState.ERROR); JobEvent mockJobEvent = mock(JobEvent.class); @@ -73,11 +73,12 @@ public void testJobNoTasksTransition() { Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition", JobState.ERROR, state); } - + @Test public void testCheckJobCompleteSuccess() { JobImpl mockJob = mock(JobImpl.class); + mockJob.tasks = new HashMap(); OutputCommitter mockCommitter = mock(OutputCommitter.class); EventHandler mockEventHandler = mock(EventHandler.class); JobContext mockJobContext = mock(JobContext.class); @@ -110,7 +111,7 @@ public void testCheckJobCompleteSuccessFailed() { Task mockTask = mock(Task.class); Map tasks = new HashMap(); tasks.put(mockTask.getID(), mockTask); - when(mockJob.getTasks()).thenReturn(tasks); + mockJob.tasks = tasks; try { // Just in case the code breaks and reaches these calls diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java index fa3d799fe7..be9981a592 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java @@ -723,6 +723,8 @@ public TaskReport[] getReduceTaskReports(String jobId) throws IOException { * @param type the type of the task (map/reduce/setup/cleanup) * @param state the state of the task * (pending/running/completed/failed/killed) + * @throws IOException when there is an error communicating with the master + * @throws IllegalArgumentException if an invalid type/state is passed */ public void displayTasks(final JobID jobId, String type, String state) throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java index f7ac9c40a6..c42456aafc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java @@ -20,6 +20,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Set; +import java.util.HashSet; +import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,6 +59,10 @@ public class CLI extends Configured implements Tool { private static final Log LOG = LogFactory.getLog(CLI.class); protected Cluster cluster; + private final Set taskTypes = new HashSet( + Arrays.asList("map", "reduce", "setup", "cleanup")); + private final Set taskStates = new HashSet( + Arrays.asList("pending", "running", "completed", "failed", "killed")); public CLI() { } @@ -545,9 +552,21 @@ private void printTaskAttempts(TaskReport report) { * @param type the type of the task (map/reduce/setup/cleanup) * @param state the state of the task * (pending/running/completed/failed/killed) + * @throws IOException when there is an error communicating with the master + * @throws InterruptedException + * @throws IllegalArgumentException if an invalid type/state is passed */ protected void displayTasks(Job job, String type, String state) throws IOException, InterruptedException { + if (!taskTypes.contains(type)) { + throw new IllegalArgumentException("Invalid type: " + type + + ". Valid types for task are: map, reduce, setup, cleanup."); + } + if (!taskStates.contains(state)) { + throw new java.lang.IllegalArgumentException("Invalid state: " + state + + ". Valid states for task are: pending, running, completed, failed, killed."); + } + TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type)); for (TaskReport report : reports) { TIPStatus status = report.getCurrentStatus(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java index c1b308935a..fd828d7669 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java @@ -135,6 +135,11 @@ public JobReport getReport() { return report; } + @Override + public float getProgress() { + return 1.0f; + } + @Override public JobState getState() { return report.getJobState(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java index d412a63864..1b1b3b1ced 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java @@ -89,6 +89,11 @@ public JobReport getReport() { return jobReport; } + @Override + public float getProgress() { + return 1.0f; + } + @Override public Counters getCounters() { return null; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java index 4e03fa2a5a..cb1bfd1ab9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java @@ -31,7 +31,9 @@ import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -163,7 +165,8 @@ public LocalizationProtocol run() { ExecutorService exec = null; try { exec = createDownloadThreadPool(); - localizeFiles(nodeManager, exec, ugi); + CompletionService ecs = createCompletionService(exec); + localizeFiles(nodeManager, ecs, ugi); return 0; } catch (Throwable e) { // Print traces to stdout so that they can be logged by the NM address @@ -182,6 +185,10 @@ ExecutorService createDownloadThreadPool() { .setNameFormat("ContainerLocalizer Downloader").build()); } + CompletionService createCompletionService(ExecutorService exec) { + return new ExecutorCompletionService(exec); + } + Callable download(LocalDirAllocator lda, LocalResource rsrc, UserGroupInformation ugi) throws IOException { Path destPath = lda.getLocalPathForWrite(".", getEstimatedSize(rsrc), conf); @@ -206,7 +213,8 @@ void sleep(int duration) throws InterruptedException { } private void localizeFiles(LocalizationProtocol nodemanager, - ExecutorService exec, UserGroupInformation ugi) throws IOException { + CompletionService cs, UserGroupInformation ugi) + throws IOException { while (true) { try { LocalizerStatus status = createStatus(); @@ -231,7 +239,7 @@ private void localizeFiles(LocalizationProtocol nodemanager, break; } // TODO: Synchronization?? - pendingResources.put(r, exec.submit(download(lda, r, ugi))); + pendingResources.put(r, cs.submit(download(lda, r, ugi))); } } break; @@ -247,8 +255,7 @@ private void localizeFiles(LocalizationProtocol nodemanager, } catch (YarnRemoteException e) { } return; } - // TODO HB immediately when rsrc localized - sleep(1); + cs.poll(1000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { return; } catch (YarnRemoteException e) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java index e4b68ffbc6..32a3367e9a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestContainerLocalizer.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Random; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -146,7 +147,8 @@ public void testContainerLocalizerMain() throws Exception { // return result instantly for deterministic test ExecutorService syncExec = mock(ExecutorService.class); - when(syncExec.submit(isA(Callable.class))) + CompletionService cs = mock(CompletionService.class); + when(cs.submit(isA(Callable.class))) .thenAnswer(new Answer>() { @Override public Future answer(InvocationOnMock invoc) @@ -159,6 +161,7 @@ public Future answer(InvocationOnMock invoc) } }); doReturn(syncExec).when(localizer).createDownloadThreadPool(); + doReturn(cs).when(localizer).createCompletionService(syncExec); // run localization assertEquals(0, localizer.runLocalization(nmAddr));