MAPREDUCE-2836. Provide option to fail jobs when submitted to non-existent fair scheduler pools. Contributed by Ahmed Radwan.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1171832 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0d99429982
commit
ca333f17c1
@ -17,6 +17,9 @@ Trunk (unreleased changes)
|
||||
MAPREDUCE-2934. MR portion of HADOOP-7607 - Simplify the RPC proxy cleanup
|
||||
process (atm)
|
||||
|
||||
MAPREDUCE-2836. Provide option to fail jobs when submitted to non-existent
|
||||
fair scheduler pools. (Ahmed Radwan via todd)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
MAPREDUCE-2784. [Gridmix] Bug fixes in ExecutionSummarizer and
|
||||
|
@ -42,6 +42,7 @@
|
||||
import org.apache.hadoop.metrics.MetricsUtil;
|
||||
import org.apache.hadoop.metrics.Updater;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
* A {@link TaskScheduler} that implements fair sharing.
|
||||
@ -98,6 +99,14 @@ public class FairScheduler extends TaskScheduler {
|
||||
protected long lastHeartbeatTime; // Time we last ran assignTasks
|
||||
private long lastPreemptCheckTime; // Time we last ran preemptTasksIfNecessary
|
||||
|
||||
/**
|
||||
* A configuration property that controls the ability of submitting jobs to
|
||||
* pools not declared in the scheduler allocation file.
|
||||
*/
|
||||
public final static String ALLOW_UNDECLARED_POOLS_KEY =
|
||||
"mapred.fairscheduler.allow.undeclared.pools";
|
||||
private boolean allowUndeclaredPools = false;
|
||||
|
||||
/**
|
||||
* A class for holding per-job scheduler variables. These always contain the
|
||||
* values of the variables at the last update(), and are used along with a
|
||||
@ -195,6 +204,8 @@ public void start() {
|
||||
"mapred.fairscheduler.locality.delay.node", defaultDelay);
|
||||
rackLocalityDelay = conf.getLong(
|
||||
"mapred.fairscheduler.locality.delay.rack", defaultDelay);
|
||||
allowUndeclaredPools = conf.getBoolean(ALLOW_UNDECLARED_POOLS_KEY, true);
|
||||
|
||||
if (defaultDelay == -1 &&
|
||||
(nodeLocalityDelay == -1 || rackLocalityDelay == -1)) {
|
||||
autoComputeLocalityDelay = true; // Compute from heartbeat interval
|
||||
@ -1098,4 +1109,22 @@ boolean isPreemptionEnabled() {
|
||||
long getLastPreemptionUpdateTime() {
|
||||
return lastPreemptionUpdateTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Examines the job's pool name to determine if it is a declared pool name (in
|
||||
* the scheduler allocation file).
|
||||
*/
|
||||
@Override
|
||||
public void checkJobSubmission(JobInProgress job)
|
||||
throws UndeclaredPoolException {
|
||||
Set<String> declaredPools = poolMgr.getDeclaredPools();
|
||||
if (!this.allowUndeclaredPools
|
||||
&& !declaredPools.contains(poolMgr.getPoolName(job)))
|
||||
throw new UndeclaredPoolException("Pool name: '"
|
||||
+ poolMgr.getPoolName(job)
|
||||
+ "' is invalid. Add pool name to the fair scheduler allocation "
|
||||
+ "file. Valid pools are: "
|
||||
+ StringUtils.join(", ", declaredPools));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -28,6 +28,8 @@
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
@ -114,6 +116,8 @@ public class PoolManager {
|
||||
private long lastSuccessfulReload; // Last time we successfully reloaded pools
|
||||
private boolean lastReloadAttemptFailed = false;
|
||||
|
||||
private Set<String> declaredPools = new TreeSet<String>();
|
||||
|
||||
public PoolManager(FairScheduler scheduler) {
|
||||
this.scheduler = scheduler;
|
||||
}
|
||||
@ -370,6 +374,8 @@ public void reloadAllocs() throws IOException, ParserConfigurationException,
|
||||
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
|
||||
this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
|
||||
this.defaultSchedulingMode = defaultSchedulingMode;
|
||||
this.declaredPools = Collections.unmodifiableSet(new TreeSet<String>(
|
||||
poolNamesInAllocFile));
|
||||
for (String name: poolNamesInAllocFile) {
|
||||
Pool pool = getPool(name);
|
||||
if (poolModes.containsKey(name)) {
|
||||
@ -543,4 +549,9 @@ synchronized void updateMetrics() {
|
||||
pool.updateMetrics();
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized Set<String> getDeclaredPools() {
|
||||
return declaredPools;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,32 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapred;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Thrown when the pool is not declared in the fair scheduler allocation file.
|
||||
*/
|
||||
public class UndeclaredPoolException extends IOException {
|
||||
|
||||
private static final long serialVersionUID = -3559057276650280117L;
|
||||
|
||||
public UndeclaredPoolException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
@ -0,0 +1,182 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.mapred;
|
||||
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.URI;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestFairSchedulerPoolNames {
|
||||
|
||||
final static String TEST_DIR = new File(System.getProperty("test.build.data",
|
||||
"build/contrib/streaming/test/data")).getAbsolutePath();
|
||||
final static String ALLOC_FILE = new File(TEST_DIR, "test-pools")
|
||||
.getAbsolutePath();
|
||||
|
||||
private static final String POOL_PROPERTY = "pool";
|
||||
private String namenode;
|
||||
private MiniDFSCluster miniDFSCluster = null;
|
||||
private MiniMRCluster miniMRCluster = null;
|
||||
|
||||
/**
|
||||
* Note that The PoolManager.ALLOW_UNDECLARED_POOLS_KEY property is set to
|
||||
* false. So, the default pool is not added, and only pool names in the
|
||||
* scheduler allocation file are considered valid.
|
||||
*/
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
new File(TEST_DIR).mkdirs(); // Make sure data directory exists
|
||||
// Create an allocation file with only one pool defined.
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<pool name=\"poolA\">");
|
||||
out.println("<minMaps>1</minMaps>");
|
||||
out.println("<minReduces>1</minReduces>");
|
||||
out.println("</pool>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
|
||||
namenode = miniDFSCluster.getFileSystem().getUri().toString();
|
||||
|
||||
JobConf clusterConf = new JobConf();
|
||||
clusterConf.set("mapred.jobtracker.taskScheduler", FairScheduler.class
|
||||
.getName());
|
||||
clusterConf.set("mapred.fairscheduler.allocation.file", ALLOC_FILE);
|
||||
clusterConf.set("mapred.fairscheduler.poolnameproperty", POOL_PROPERTY);
|
||||
clusterConf.setBoolean(FairScheduler.ALLOW_UNDECLARED_POOLS_KEY, false);
|
||||
miniMRCluster = new MiniMRCluster(1, namenode, 1, null, null, clusterConf);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (miniDFSCluster != null) {
|
||||
miniDFSCluster.shutdown();
|
||||
}
|
||||
if (miniMRCluster != null) {
|
||||
miniMRCluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private void submitJob(String pool) throws IOException {
|
||||
JobConf conf = new JobConf();
|
||||
final Path inDir = new Path("/tmp/testing/wc/input");
|
||||
final Path outDir = new Path("/tmp/testing/wc/output");
|
||||
FileSystem fs = FileSystem.get(URI.create(namenode), conf);
|
||||
fs.delete(outDir, true);
|
||||
if (!fs.mkdirs(inDir)) {
|
||||
throw new IOException("Mkdirs failed to create " + inDir.toString());
|
||||
}
|
||||
DataOutputStream file = fs.create(new Path(inDir, "part-00000"));
|
||||
file.writeBytes("Sample text");
|
||||
file.close();
|
||||
|
||||
FileSystem.setDefaultUri(conf, namenode);
|
||||
conf.set("mapred.job.tracker", "localhost:"
|
||||
+ miniMRCluster.getJobTrackerPort());
|
||||
conf.setJobName("wordcount");
|
||||
conf.setInputFormat(TextInputFormat.class);
|
||||
|
||||
// the keys are words (strings)
|
||||
conf.setOutputKeyClass(Text.class);
|
||||
// the values are counts (ints)
|
||||
conf.setOutputValueClass(IntWritable.class);
|
||||
|
||||
conf.setMapperClass(WordCount.MapClass.class);
|
||||
conf.setCombinerClass(WordCount.Reduce.class);
|
||||
conf.setReducerClass(WordCount.Reduce.class);
|
||||
|
||||
FileInputFormat.setInputPaths(conf, inDir);
|
||||
FileOutputFormat.setOutputPath(conf, outDir);
|
||||
conf.setNumMapTasks(1);
|
||||
conf.setNumReduceTasks(0);
|
||||
|
||||
if (pool != null) {
|
||||
conf.set(POOL_PROPERTY, pool);
|
||||
}
|
||||
|
||||
JobClient.runJob(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests job submission using the default pool name.
|
||||
*/
|
||||
@Test
|
||||
public void testDefaultPoolName() {
|
||||
Throwable t = null;
|
||||
try {
|
||||
submitJob(null);
|
||||
} catch (Exception e) {
|
||||
t = e;
|
||||
}
|
||||
assertNotNull("No exception during submission", t);
|
||||
assertTrue("Incorrect exception message", t.getMessage().contains(
|
||||
"Add pool name to the fair scheduler allocation file"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests job submission using a valid pool name (i.e., name exists in the fair
|
||||
* scheduler allocation file).
|
||||
*/
|
||||
@Test
|
||||
public void testValidPoolName() {
|
||||
Throwable t = null;
|
||||
try {
|
||||
submitJob("poolA");
|
||||
} catch (Exception e) {
|
||||
t = e;
|
||||
}
|
||||
assertNull("Exception during submission", t);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests job submission using an invalid pool name (i.e., name doesn't exist
|
||||
* in the fair scheduler allocation file).
|
||||
*/
|
||||
@Test
|
||||
public void testInvalidPoolName() {
|
||||
Throwable t = null;
|
||||
try {
|
||||
submitJob("poolB");
|
||||
} catch (Exception e) {
|
||||
t = e;
|
||||
}
|
||||
assertNotNull("No exception during submission", t);
|
||||
assertTrue("Incorrect exception message", t.getMessage().contains(
|
||||
"Add pool name to the fair scheduler allocation file"));
|
||||
}
|
||||
|
||||
}
|
@ -192,6 +192,15 @@
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
mapred.fairscheduler.allow.undeclared.pools
|
||||
</td>
|
||||
<td>
|
||||
Boolean property for enabling job submission to pools not declared
|
||||
in the allocation file. Default: true.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
mapred.fairscheduler.allocation.file
|
||||
</td>
|
||||
|
@ -3161,6 +3161,13 @@ private JobStatus submitJob(org.apache.hadoop.mapreduce.JobID jobID,
|
||||
throw ace;
|
||||
}
|
||||
|
||||
try {
|
||||
this.taskScheduler.checkJobSubmission(job);
|
||||
} catch (IOException ioe){
|
||||
LOG.error("Problem in submitting job " + jobId, ioe);
|
||||
throw ioe;
|
||||
}
|
||||
|
||||
// Check the job if it cannot run in the cluster because of invalid memory
|
||||
// requirements.
|
||||
try {
|
||||
|
@ -104,4 +104,14 @@ public abstract List<Task> assignTasks(TaskTracker taskTracker)
|
||||
QueueRefresher getQueueRefresher() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclasses can override to provide any scheduler-specific checking
|
||||
* mechanism for job submission.
|
||||
* @param job
|
||||
* @throws IOException
|
||||
*/
|
||||
public void checkJobSubmission(JobInProgress job) throws IOException{
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user