MAPREDUCE-3237. Move LocalJobRunner to hadoop-mapreduce-client-core. Contributed by Tom White.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1195792 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e5badc0c1a
commit
cfb6a9883d
@ -1900,6 +1900,9 @@ Release 0.23.0 - Unreleased
|
|||||||
MAPREDUCE-3192. Fix Javadoc warning in JobClient.java and Cluster.java.
|
MAPREDUCE-3192. Fix Javadoc warning in JobClient.java and Cluster.java.
|
||||||
(jitendra)
|
(jitendra)
|
||||||
|
|
||||||
|
MAPREDUCE-3237. Move LocalJobRunner to hadoop-mapreduce-client-core.
|
||||||
|
(tomwhite via acmurthy)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -0,0 +1,215 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.MalformedURLException;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.net.URLClassLoader;
|
||||||
|
import java.security.AccessController;
|
||||||
|
import java.security.PrivilegedAction;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.fs.FileContext;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||||
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.FSDownload;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A helper class for managing the distributed cache for {@link LocalJobRunner}.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
class LocalDistributedCacheManager {
|
||||||
|
public static final Log LOG =
|
||||||
|
LogFactory.getLog(LocalDistributedCacheManager.class);
|
||||||
|
|
||||||
|
private List<String> localArchives = new ArrayList<String>();
|
||||||
|
private List<String> localFiles = new ArrayList<String>();
|
||||||
|
private List<String> localClasspaths = new ArrayList<String>();
|
||||||
|
|
||||||
|
private boolean setupCalled = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set up the distributed cache by localizing the resources, and updating
|
||||||
|
* the configuration with references to the localized resources.
|
||||||
|
* @param conf
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void setup(JobConf conf) throws IOException {
|
||||||
|
// Generate YARN local resources objects corresponding to the distributed
|
||||||
|
// cache configuration
|
||||||
|
Map<String, LocalResource> localResources =
|
||||||
|
new LinkedHashMap<String, LocalResource>();
|
||||||
|
MRApps.setupDistributedCache(conf, localResources);
|
||||||
|
|
||||||
|
// Find which resources are to be put on the local classpath
|
||||||
|
Map<String, Path> classpaths = new HashMap<String, Path>();
|
||||||
|
Path[] archiveClassPaths = DistributedCache.getArchiveClassPaths(conf);
|
||||||
|
if (archiveClassPaths != null) {
|
||||||
|
for (Path p : archiveClassPaths) {
|
||||||
|
FileSystem remoteFS = p.getFileSystem(conf);
|
||||||
|
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
|
||||||
|
remoteFS.getWorkingDirectory()));
|
||||||
|
classpaths.put(p.toUri().getPath().toString(), p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Path[] fileClassPaths = DistributedCache.getFileClassPaths(conf);
|
||||||
|
if (fileClassPaths != null) {
|
||||||
|
for (Path p : fileClassPaths) {
|
||||||
|
FileSystem remoteFS = p.getFileSystem(conf);
|
||||||
|
p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
|
||||||
|
remoteFS.getWorkingDirectory()));
|
||||||
|
classpaths.put(p.toUri().getPath().toString(), p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Localize the resources
|
||||||
|
LocalDirAllocator localDirAllocator =
|
||||||
|
new LocalDirAllocator(MRConfig.LOCAL_DIR);
|
||||||
|
FileContext localFSFileContext = FileContext.getLocalFSFileContext();
|
||||||
|
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||||
|
|
||||||
|
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
|
||||||
|
ExecutorService exec = Executors.newCachedThreadPool();
|
||||||
|
for (LocalResource resource : localResources.values()) {
|
||||||
|
Callable<Path> download = new FSDownload(localFSFileContext, ugi, conf,
|
||||||
|
localDirAllocator, resource, new Random());
|
||||||
|
Future<Path> future = exec.submit(download);
|
||||||
|
resourcesToPaths.put(resource, future);
|
||||||
|
}
|
||||||
|
for (LocalResource resource : localResources.values()) {
|
||||||
|
Path path;
|
||||||
|
try {
|
||||||
|
path = resourcesToPaths.get(resource).get();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
String pathString = path.toUri().toString();
|
||||||
|
if (resource.getType() == LocalResourceType.ARCHIVE) {
|
||||||
|
localArchives.add(pathString);
|
||||||
|
} else if (resource.getType() == LocalResourceType.FILE) {
|
||||||
|
localFiles.add(pathString);
|
||||||
|
}
|
||||||
|
Path resourcePath;
|
||||||
|
try {
|
||||||
|
resourcePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
LOG.info(String.format("Localized %s as %s", resourcePath, path));
|
||||||
|
String cp = resourcePath.toUri().getPath();
|
||||||
|
if (classpaths.keySet().contains(cp)) {
|
||||||
|
localClasspaths.add(path.toUri().getPath().toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the configuration object with localized data.
|
||||||
|
if (!localArchives.isEmpty()) {
|
||||||
|
conf.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
|
||||||
|
.arrayToString(localArchives.toArray(new String[localArchives
|
||||||
|
.size()])));
|
||||||
|
}
|
||||||
|
if (!localFiles.isEmpty()) {
|
||||||
|
conf.set(MRJobConfig.CACHE_LOCALFILES, StringUtils
|
||||||
|
.arrayToString(localFiles.toArray(new String[localArchives
|
||||||
|
.size()])));
|
||||||
|
}
|
||||||
|
if (DistributedCache.getSymlink(conf)) {
|
||||||
|
// This is not supported largely because,
|
||||||
|
// for a Child subprocess, the cwd in LocalJobRunner
|
||||||
|
// is not a fresh slate, but rather the user's working directory.
|
||||||
|
// This is further complicated because the logic in
|
||||||
|
// setupWorkDir only creates symlinks if there's a jarfile
|
||||||
|
// in the configuration.
|
||||||
|
LOG.warn("LocalJobRunner does not support " +
|
||||||
|
"symlinking into current working dir.");
|
||||||
|
}
|
||||||
|
setupCalled = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Are the resources that should be added to the classpath?
|
||||||
|
* Should be called after setup().
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public boolean hasLocalClasspaths() {
|
||||||
|
if (!setupCalled) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"hasLocalClasspaths() should be called after setup()");
|
||||||
|
}
|
||||||
|
return !localClasspaths.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a class loader that includes the designated
|
||||||
|
* files and archives.
|
||||||
|
*/
|
||||||
|
public ClassLoader makeClassLoader(final ClassLoader parent)
|
||||||
|
throws MalformedURLException {
|
||||||
|
final URL[] urls = new URL[localClasspaths.size()];
|
||||||
|
for (int i = 0; i < localClasspaths.size(); ++i) {
|
||||||
|
urls[i] = new File(localClasspaths.get(i)).toURI().toURL();
|
||||||
|
LOG.info(urls[i]);
|
||||||
|
}
|
||||||
|
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
|
||||||
|
@Override
|
||||||
|
public ClassLoader run() {
|
||||||
|
return new URLClassLoader(urls, parent);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() throws IOException {
|
||||||
|
FileContext localFSFileContext = FileContext.getLocalFSFileContext();
|
||||||
|
for (String archive : localArchives) {
|
||||||
|
localFSFileContext.delete(new Path(archive), true);
|
||||||
|
}
|
||||||
|
for (String file : localFiles) {
|
||||||
|
localFSFileContext.delete(new Path(file), true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -18,7 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.mapred;
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -27,8 +26,8 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
@ -38,28 +37,23 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.LocalDirAllocator;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.ipc.ProtocolSignature;
|
import org.apache.hadoop.ipc.ProtocolSignature;
|
||||||
|
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
|
||||||
import org.apache.hadoop.mapreduce.ClusterMetrics;
|
import org.apache.hadoop.mapreduce.ClusterMetrics;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.QueueInfo;
|
import org.apache.hadoop.mapreduce.QueueInfo;
|
||||||
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
|
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
|
||||||
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
|
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
|
||||||
import org.apache.hadoop.mapreduce.TaskType;
|
import org.apache.hadoop.mapreduce.TaskType;
|
||||||
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
|
|
||||||
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
|
|
||||||
import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
|
|
||||||
import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
|
|
||||||
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
|
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
||||||
|
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
||||||
|
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
|
||||||
import org.apache.hadoop.mapreduce.v2.LogParams;
|
import org.apache.hadoop.mapreduce.v2.LogParams;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
|
||||||
import org.apache.hadoop.mapreduce.server.jobtracker.State;
|
|
||||||
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
|
|
||||||
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
@ -67,6 +61,7 @@
|
|||||||
/** Implements MapReduce locally, in-process, for debugging. */
|
/** Implements MapReduce locally, in-process, for debugging. */
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
public class LocalJobRunner implements ClientProtocol {
|
public class LocalJobRunner implements ClientProtocol {
|
||||||
public static final Log LOG =
|
public static final Log LOG =
|
||||||
LogFactory.getLog(LocalJobRunner.class);
|
LogFactory.getLog(LocalJobRunner.class);
|
||||||
@ -82,7 +77,7 @@ public class LocalJobRunner implements ClientProtocol {
|
|||||||
private int reduce_tasks = 0;
|
private int reduce_tasks = 0;
|
||||||
final Random rand = new Random();
|
final Random rand = new Random();
|
||||||
|
|
||||||
private JobTrackerInstrumentation myMetrics = null;
|
private LocalJobRunnerMetrics myMetrics = null;
|
||||||
|
|
||||||
private static final String jobDir = "localRunner/";
|
private static final String jobDir = "localRunner/";
|
||||||
|
|
||||||
@ -125,8 +120,7 @@ private class Job extends Thread implements TaskUmbilicalProtocol {
|
|||||||
private FileSystem localFs;
|
private FileSystem localFs;
|
||||||
boolean killed = false;
|
boolean killed = false;
|
||||||
|
|
||||||
private TrackerDistributedCacheManager trackerDistributerdCacheManager;
|
private LocalDistributedCacheManager localDistributedCacheManager;
|
||||||
private TaskDistributedCacheManager taskDistributedCacheManager;
|
|
||||||
|
|
||||||
public long getProtocolVersion(String protocol, long clientVersion) {
|
public long getProtocolVersion(String protocol, long clientVersion) {
|
||||||
return TaskUmbilicalProtocol.versionID;
|
return TaskUmbilicalProtocol.versionID;
|
||||||
@ -150,27 +144,8 @@ public Job(JobID jobid, String jobSubmitDir) throws IOException {
|
|||||||
|
|
||||||
// Manage the distributed cache. If there are files to be copied,
|
// Manage the distributed cache. If there are files to be copied,
|
||||||
// this will trigger localFile to be re-written again.
|
// this will trigger localFile to be re-written again.
|
||||||
this.trackerDistributerdCacheManager =
|
localDistributedCacheManager = new LocalDistributedCacheManager();
|
||||||
new TrackerDistributedCacheManager(conf, new DefaultTaskController());
|
localDistributedCacheManager.setup(conf);
|
||||||
this.taskDistributedCacheManager =
|
|
||||||
trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
|
|
||||||
taskDistributedCacheManager.setup(
|
|
||||||
new LocalDirAllocator(MRConfig.LOCAL_DIR),
|
|
||||||
new File(systemJobDir.toString()),
|
|
||||||
"archive", "archive");
|
|
||||||
|
|
||||||
if (DistributedCache.getSymlink(conf)) {
|
|
||||||
// This is not supported largely because,
|
|
||||||
// for a Child subprocess, the cwd in LocalJobRunner
|
|
||||||
// is not a fresh slate, but rather the user's working directory.
|
|
||||||
// This is further complicated because the logic in
|
|
||||||
// setupWorkDir only creates symlinks if there's a jarfile
|
|
||||||
// in the configuration.
|
|
||||||
LOG.warn("LocalJobRunner does not support " +
|
|
||||||
"symlinking into current working dir.");
|
|
||||||
}
|
|
||||||
// Setup the symlinks for the distributed cache.
|
|
||||||
TaskRunner.setupWorkDir(conf, new File(localJobDir.toUri()).getAbsoluteFile());
|
|
||||||
|
|
||||||
// Write out configuration file. Instead of copying it from
|
// Write out configuration file. Instead of copying it from
|
||||||
// systemJobFile, we re-write it, since setup(), above, may have
|
// systemJobFile, we re-write it, since setup(), above, may have
|
||||||
@ -184,8 +159,8 @@ public Job(JobID jobid, String jobSubmitDir) throws IOException {
|
|||||||
this.job = new JobConf(localJobFile);
|
this.job = new JobConf(localJobFile);
|
||||||
|
|
||||||
// Job (the current object) is a Thread, so we wrap its class loader.
|
// Job (the current object) is a Thread, so we wrap its class loader.
|
||||||
if (!taskDistributedCacheManager.getClassPaths().isEmpty()) {
|
if (localDistributedCacheManager.hasLocalClasspaths()) {
|
||||||
setContextClassLoader(taskDistributedCacheManager.makeClassLoader(
|
setContextClassLoader(localDistributedCacheManager.makeClassLoader(
|
||||||
getContextClassLoader()));
|
getContextClassLoader()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -200,10 +175,6 @@ public Job(JobID jobid, String jobSubmitDir) throws IOException {
|
|||||||
this.start();
|
this.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
JobProfile getProfile() {
|
|
||||||
return profile;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Runnable instance that handles a map task to be run by an executor.
|
* A Runnable instance that handles a map task to be run by an executor.
|
||||||
*/
|
*/
|
||||||
@ -239,7 +210,7 @@ public void run() {
|
|||||||
info.getSplitIndex(), 1);
|
info.getSplitIndex(), 1);
|
||||||
map.setUser(UserGroupInformation.getCurrentUser().
|
map.setUser(UserGroupInformation.getCurrentUser().
|
||||||
getShortUserName());
|
getShortUserName());
|
||||||
TaskRunner.setupChildMapredLocalDirs(map, localConf);
|
setupChildMapredLocalDirs(map, localConf);
|
||||||
|
|
||||||
MapOutputFile mapOutput = new MROutputFiles();
|
MapOutputFile mapOutput = new MROutputFiles();
|
||||||
mapOutput.setConf(localConf);
|
mapOutput.setConf(localConf);
|
||||||
@ -333,7 +304,6 @@ protected ExecutorService createMapExecutor(int numMapTasks) {
|
|||||||
return executor;
|
return executor;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
JobID jobId = profile.getJobID();
|
JobID jobId = profile.getJobID();
|
||||||
@ -399,7 +369,7 @@ public void run() {
|
|||||||
getShortUserName());
|
getShortUserName());
|
||||||
JobConf localConf = new JobConf(job);
|
JobConf localConf = new JobConf(job);
|
||||||
localConf.set("mapreduce.jobtracker.address", "local");
|
localConf.set("mapreduce.jobtracker.address", "local");
|
||||||
TaskRunner.setupChildMapredLocalDirs(reduce, localConf);
|
setupChildMapredLocalDirs(reduce, localConf);
|
||||||
// move map output to reduce input
|
// move map output to reduce input
|
||||||
for (int i = 0; i < mapIds.size(); i++) {
|
for (int i = 0; i < mapIds.size(); i++) {
|
||||||
if (!this.isInterrupted()) {
|
if (!this.isInterrupted()) {
|
||||||
@ -473,8 +443,7 @@ public void run() {
|
|||||||
fs.delete(systemJobFile.getParent(), true); // delete submit dir
|
fs.delete(systemJobFile.getParent(), true); // delete submit dir
|
||||||
localFs.delete(localJobFile, true); // delete local copy
|
localFs.delete(localJobFile, true); // delete local copy
|
||||||
// Cleanup distributed cache
|
// Cleanup distributed cache
|
||||||
taskDistributedCacheManager.release();
|
localDistributedCacheManager.close();
|
||||||
trackerDistributerdCacheManager.purgeCache();
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Error cleaning up "+id+": "+e);
|
LOG.warn("Error cleaning up "+id+": "+e);
|
||||||
}
|
}
|
||||||
@ -593,7 +562,7 @@ public LocalJobRunner(Configuration conf) throws IOException {
|
|||||||
public LocalJobRunner(JobConf conf) throws IOException {
|
public LocalJobRunner(JobConf conf) throws IOException {
|
||||||
this.fs = FileSystem.getLocal(conf);
|
this.fs = FileSystem.getLocal(conf);
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
myMetrics = new JobTrackerMetricsInst(null, new JobConf(conf));
|
myMetrics = new LocalJobRunnerMetrics(new JobConf(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
// JobSubmissionProtocol methods
|
// JobSubmissionProtocol methods
|
||||||
@ -661,14 +630,6 @@ public ClusterMetrics getClusterMetrics() {
|
|||||||
reduce_tasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
|
reduce_tasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @deprecated Use {@link #getJobTrackerStatus()} instead.
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public State getJobTrackerState() throws IOException, InterruptedException {
|
|
||||||
return State.RUNNING;
|
|
||||||
}
|
|
||||||
|
|
||||||
public JobTrackerStatus getJobTrackerStatus() {
|
public JobTrackerStatus getJobTrackerStatus() {
|
||||||
return JobTrackerStatus.RUNNING;
|
return JobTrackerStatus.RUNNING;
|
||||||
}
|
}
|
||||||
@ -723,7 +684,7 @@ public String getSystemDir() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @see org.apache.hadoop.mapred.JobSubmissionProtocol#getQueueAdmins()
|
* @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getQueueAdmins(String)
|
||||||
*/
|
*/
|
||||||
public AccessControlList getQueueAdmins(String queueName) throws IOException {
|
public AccessControlList getQueueAdmins(String queueName) throws IOException {
|
||||||
return new AccessControlList(" ");// no queue admins for local job runner
|
return new AccessControlList(" ");// no queue admins for local job runner
|
||||||
@ -820,4 +781,37 @@ public LogParams getLogFileParams(org.apache.hadoop.mapreduce.JobID jobID,
|
|||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
throw new UnsupportedOperationException("Not supported");
|
throw new UnsupportedOperationException("Not supported");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void setupChildMapredLocalDirs(Task t, JobConf conf) {
|
||||||
|
String[] localDirs = conf.getTrimmedStrings(MRConfig.LOCAL_DIR);
|
||||||
|
String jobId = t.getJobID().toString();
|
||||||
|
String taskId = t.getTaskID().toString();
|
||||||
|
boolean isCleanup = t.isTaskCleanupTask();
|
||||||
|
String user = t.getUser();
|
||||||
|
StringBuffer childMapredLocalDir =
|
||||||
|
new StringBuffer(localDirs[0] + Path.SEPARATOR
|
||||||
|
+ getLocalTaskDir(user, jobId, taskId, isCleanup));
|
||||||
|
for (int i = 1; i < localDirs.length; i++) {
|
||||||
|
childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
|
||||||
|
+ getLocalTaskDir(user, jobId, taskId, isCleanup));
|
||||||
|
}
|
||||||
|
LOG.debug(MRConfig.LOCAL_DIR + " for child : " + childMapredLocalDir);
|
||||||
|
conf.set(MRConfig.LOCAL_DIR, childMapredLocalDir.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
static final String TASK_CLEANUP_SUFFIX = ".cleanup";
|
||||||
|
static final String SUBDIR = jobDir;
|
||||||
|
static final String JOBCACHE = "jobcache";
|
||||||
|
|
||||||
|
static String getLocalTaskDir(String user, String jobid, String taskid,
|
||||||
|
boolean isCleanupAttempt) {
|
||||||
|
String taskDir = SUBDIR + Path.SEPARATOR + user + Path.SEPARATOR + JOBCACHE
|
||||||
|
+ Path.SEPARATOR + jobid + Path.SEPARATOR + taskid;
|
||||||
|
if (isCleanupAttempt) {
|
||||||
|
taskDir = taskDir + TASK_CLEANUP_SUFFIX;
|
||||||
|
}
|
||||||
|
return taskDir;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
@ -0,0 +1,98 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapred;
|
||||||
|
|
||||||
|
import org.apache.hadoop.metrics.MetricsContext;
|
||||||
|
import org.apache.hadoop.metrics.MetricsRecord;
|
||||||
|
import org.apache.hadoop.metrics.MetricsUtil;
|
||||||
|
import org.apache.hadoop.metrics.Updater;
|
||||||
|
import org.apache.hadoop.metrics.jvm.JvmMetrics;
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
class LocalJobRunnerMetrics implements Updater {
|
||||||
|
private final MetricsRecord metricsRecord;
|
||||||
|
|
||||||
|
private int numMapTasksLaunched = 0;
|
||||||
|
private int numMapTasksCompleted = 0;
|
||||||
|
private int numReduceTasksLaunched = 0;
|
||||||
|
private int numReduceTasksCompleted = 0;
|
||||||
|
private int numWaitingMaps = 0;
|
||||||
|
private int numWaitingReduces = 0;
|
||||||
|
|
||||||
|
public LocalJobRunnerMetrics(JobConf conf) {
|
||||||
|
String sessionId = conf.getSessionId();
|
||||||
|
// Initiate JVM Metrics
|
||||||
|
JvmMetrics.init("JobTracker", sessionId);
|
||||||
|
// Create a record for map-reduce metrics
|
||||||
|
MetricsContext context = MetricsUtil.getContext("mapred");
|
||||||
|
// record name is jobtracker for compatibility
|
||||||
|
metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
|
||||||
|
metricsRecord.setTag("sessionId", sessionId);
|
||||||
|
context.registerUpdater(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Since this object is a registered updater, this method will be called
|
||||||
|
* periodically, e.g. every 5 seconds.
|
||||||
|
*/
|
||||||
|
public void doUpdates(MetricsContext unused) {
|
||||||
|
synchronized (this) {
|
||||||
|
metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
|
||||||
|
metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
|
||||||
|
metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
|
||||||
|
metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
|
||||||
|
metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
|
||||||
|
metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
|
||||||
|
|
||||||
|
numMapTasksLaunched = 0;
|
||||||
|
numMapTasksCompleted = 0;
|
||||||
|
numReduceTasksLaunched = 0;
|
||||||
|
numReduceTasksCompleted = 0;
|
||||||
|
numWaitingMaps = 0;
|
||||||
|
numWaitingReduces = 0;
|
||||||
|
}
|
||||||
|
metricsRecord.update();
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void launchMap(TaskAttemptID taskAttemptID) {
|
||||||
|
++numMapTasksLaunched;
|
||||||
|
decWaitingMaps(taskAttemptID.getJobID(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void completeMap(TaskAttemptID taskAttemptID) {
|
||||||
|
++numMapTasksCompleted;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
|
||||||
|
++numReduceTasksLaunched;
|
||||||
|
decWaitingReduces(taskAttemptID.getJobID(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void completeReduce(TaskAttemptID taskAttemptID) {
|
||||||
|
++numReduceTasksCompleted;
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void decWaitingMaps(JobID id, int task) {
|
||||||
|
numWaitingMaps -= task;
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void decWaitingReduces(JobID id, int task){
|
||||||
|
numWaitingReduces -= task;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,14 @@
|
|||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
#
|
||||||
|
org.apache.hadoop.mapred.LocalClientProtocolProvider
|
@ -163,17 +163,6 @@ public void testLocalJobRunner() throws Exception {
|
|||||||
testWithConf(c);
|
testWithConf(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Tests using a full MiniMRCluster. */
|
|
||||||
public void testMiniMRJobRunner() throws Exception {
|
|
||||||
MiniMRCluster m = new MiniMRCluster(1, "file:///", 1);
|
|
||||||
try {
|
|
||||||
testWithConf(m.createJobConf());
|
|
||||||
} finally {
|
|
||||||
m.shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private Path createTempFile(String filename, String contents)
|
private Path createTempFile(String filename, String contents)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Path path = new Path(TEST_ROOT_DIR, filename);
|
Path path = new Path(TEST_ROOT_DIR, filename);
|
@ -1166,7 +1166,7 @@
|
|||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>mapreduce.framework.name</name>
|
<name>mapreduce.framework.name</name>
|
||||||
<value>yarn</value>
|
<value>local</value>
|
||||||
<description>The runtime framework for executing MapReduce jobs.
|
<description>The runtime framework for executing MapReduce jobs.
|
||||||
Can be one of local, classic or yarn.
|
Can be one of local, classic or yarn.
|
||||||
</description>
|
</description>
|
||||||
|
@ -16,7 +16,7 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
package org.apache.hadoop.yarn.util;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
@ -65,7 +65,7 @@ public class FSDownload implements Callable<Path> {
|
|||||||
static final FsPermission PUBLIC_DIR_PERMS = new FsPermission((short) 0755);
|
static final FsPermission PUBLIC_DIR_PERMS = new FsPermission((short) 0755);
|
||||||
static final FsPermission PRIVATE_DIR_PERMS = new FsPermission((short) 0700);
|
static final FsPermission PRIVATE_DIR_PERMS = new FsPermission((short) 0700);
|
||||||
|
|
||||||
FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
|
public FSDownload(FileContext files, UserGroupInformation ugi, Configuration conf,
|
||||||
LocalDirAllocator dirs, LocalResource resource, Random rand) {
|
LocalDirAllocator dirs, LocalResource resource, Random rand) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.dirs = dirs;
|
this.dirs = dirs;
|
@ -16,7 +16,7 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
|
package org.apache.hadoop.yarn.util;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
import static org.apache.hadoop.fs.CreateFlag.CREATE;
|
||||||
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
|
@ -65,6 +65,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.FSDownload;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
|
@ -110,6 +110,7 @@
|
|||||||
import org.apache.hadoop.yarn.service.AbstractService;
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.service.CompositeService;
|
import org.apache.hadoop.yarn.service.CompositeService;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
import org.apache.hadoop.yarn.util.FSDownload;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
|
@ -12,4 +12,3 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
org.apache.hadoop.mapred.JobTrackerClientProtocolProvider
|
org.apache.hadoop.mapred.JobTrackerClientProtocolProvider
|
||||||
org.apache.hadoop.mapred.LocalClientProtocolProvider
|
|
||||||
|
Loading…
Reference in New Issue
Block a user