MAPREDUCE-5042. Reducer unable to fetch for a map task that was recovered (Jason Lowe via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1457119 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Joseph Evans 2013-03-15 21:09:25 +00:00
parent 415d038319
commit 7d7553c4eb
14 changed files with 236 additions and 25 deletions

View File

@ -815,6 +815,9 @@ Release 0.23.7 - UNRELEASED
MAPREDUCE-5060. Fetch failures that time out only count against the first
map task (Robert Joseph Evans via jlowe)
MAPREDUCE-5042. Reducer unable to fetch for a map task that was recovered
(Jason Lowe via bobby)
Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -269,9 +269,17 @@ private static JobConf configureTask(Task task, Credentials credentials,
job.setBoolean("ipc.client.tcpnodelay", true);
job.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS,
YarnOutputFiles.class, MapOutputFile.class);
// set the jobTokenFile into task
// set the jobToken and shuffle secrets into task
task.setJobTokenSecret(
JobTokenSecretManager.createSecretKey(jt.getPassword()));
byte[] shuffleSecret = TokenCache.getShuffleSecretKey(credentials);
if (shuffleSecret == null) {
LOG.warn("Shuffle secret missing from task credentials."
+ " Using job token secret as shuffle secret.");
shuffleSecret = jt.getPassword();
}
task.setShuffleSecret(
JobTokenSecretManager.createSecretKey(shuffleSecret));
// setup the child's MRConfig.LOCAL_DIR.
configureLocalDirs(task, job);

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
@ -339,8 +340,15 @@ public void init(final Configuration conf) {
boolean recoveryEnabled = conf.getBoolean(
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
boolean recoverySupportedByCommitter = committer.isRecoverySupported();
// If a shuffle secret was not provided by the job client then this app
// attempt will generate one. However that disables recovery if there
// are reducers as the shuffle secret would be app attempt specific.
boolean shuffleKeyValidForRecovery = (numReduceTasks > 0 &&
TokenCache.getShuffleSecretKey(fsTokens) != null);
if (recoveryEnabled && recoverySupportedByCommitter
&& appAttemptID.getAttemptId() > 1) {
&& shuffleKeyValidForRecovery && appAttemptID.getAttemptId() > 1) {
LOG.info("Recovery is enabled. "
+ "Will try to recover from previous life on best effort basis.");
recoveryServ = createRecoveryService(context);
@ -351,7 +359,8 @@ public void init(final Configuration conf) {
} else {
LOG.info("Not starting RecoveryService: recoveryEnabled: "
+ recoveryEnabled + " recoverySupportedByCommitter: "
+ recoverySupportedByCommitter + " ApplicationAttemptID: "
+ recoverySupportedByCommitter + " shuffleKeyValidForRecovery: "
+ shuffleKeyValidForRecovery + " ApplicationAttemptID: "
+ appAttemptID.getAttemptId());
dispatcher = createDispatcher();
addIfService(dispatcher);
@ -472,6 +481,10 @@ protected FileSystem getFileSystem(Configuration conf) throws IOException {
return FileSystem.get(conf);
}
protected Credentials getCredentials() {
return fsTokens;
}
/**
* clean up staging directories for the job.
* @throws IOException

View File

@ -1350,13 +1350,13 @@ protected void setup(JobImpl job) throws IOException {
LOG.info("Adding job token for " + oldJobIDString
+ " to jobTokenSecretManager");
// Upload the jobTokens onto the remote FS so that ContainerManager can
// localize it to be used by the Containers(tasks)
Credentials tokenStorage = new Credentials();
TokenCache.setJobToken(job.jobToken, tokenStorage);
if (UserGroupInformation.isSecurityEnabled()) {
tokenStorage.addAll(job.fsTokens);
// If the job client did not setup the shuffle secret then reuse
// the job token secret for the shuffle.
if (TokenCache.getShuffleSecretKey(job.fsTokens) == null) {
LOG.warn("Shuffle secret key missing from job credentials."
+ " Using job token secret as shuffle secret.");
TokenCache.setShuffleSecretKey(job.jobToken.getPassword(),
job.fsTokens);
}
}

View File

@ -702,10 +702,21 @@ private static ContainerLaunchContext createCommonContainerLaunchContext(
ByteBuffer.wrap(containerTokens_dob.getData(), 0,
containerTokens_dob.getLength());
// Add shuffle token
// Add shuffle secret key
// The secret key is converted to a JobToken to preserve backwards
// compatibility with an older ShuffleHandler running on an NM.
LOG.info("Putting shuffle token in serviceData");
byte[] shuffleSecret = TokenCache.getShuffleSecretKey(credentials);
if (shuffleSecret == null) {
LOG.warn("Cannot locate shuffle secret in credentials."
+ " Using job token as shuffle secret.");
shuffleSecret = jobToken.getPassword();
}
Token<JobTokenIdentifier> shuffleToken = new Token<JobTokenIdentifier>(
jobToken.getIdentifier(), shuffleSecret, jobToken.getKind(),
jobToken.getService());
serviceData.put(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
ShuffleHandler.serializeServiceData(jobToken));
ShuffleHandler.serializeServiceData(shuffleToken));
Apps.addToEnvironment(
environment,

View File

@ -42,6 +42,7 @@
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -144,6 +145,9 @@ public MRApp(int maps, int reduces, boolean autoComplete, String testName,
@Override
protected void downloadTokensAndSetupUGI(Configuration conf) {
// Fake a shuffle secret that normally is provided by the job client.
String shuffleSecret = "fake-shuffle-secret";
TokenCache.setShuffleSecretKey(shuffleSecret.getBytes(), getCredentials());
}
private static ApplicationAttemptId getApplicationAttemptId(

View File

@ -900,6 +900,117 @@ public void testSpeculative() throws Exception {
}
@Test(timeout=30000)
public void testRecoveryWithoutShuffleSecret() throws Exception {
int runCount = 0;
MRApp app = new MRAppNoShuffleSecret(2, 1, false,
this.getClass().getName(), true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
//all maps would be running
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task mapTask2 = it.next();
Task reduceTask = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
app.waitForState(mapTask2, TaskState.RUNNING);
TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator().next();
TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
// reduces must be in NEW state
Assert.assertEquals("Reduce Task state not correct",
TaskState.RUNNING, reduceTask.getReport().getTaskState());
//send the done signal to the 1st map attempt
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt.getID(),
TaskAttemptEventType.TA_DONE));
//wait for first map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
//stop the app
app.stop();
//in recovery the 1st map should NOT be recovered from previous run
//since the shuffle secret was not provided with the job credentials
//and had to be rolled per app attempt
app = new MRAppNoShuffleSecret(2, 1, false,
this.getClass().getName(), false, ++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
//all maps would be running
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
mapTask2 = it.next();
reduceTask = it.next();
app.waitForState(mapTask1, TaskState.RUNNING);
app.waitForState(mapTask2, TaskState.RUNNING);
task2Attempt = mapTask2.getAttempts().values().iterator().next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
//send the done signal to the 2nd map task
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
mapTask2.getAttempts().values().iterator().next().getID(),
TaskAttemptEventType.TA_DONE));
//wait to get it completed
app.waitForState(mapTask2, TaskState.SUCCEEDED);
//verify first map task is still running
app.waitForState(mapTask1, TaskState.RUNNING);
//send the done signal to the 2nd map task
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
mapTask1.getAttempts().values().iterator().next().getID(),
TaskAttemptEventType.TA_DONE));
//wait to get it completed
app.waitForState(mapTask1, TaskState.SUCCEEDED);
//wait for reduce to be running before sending done
app.waitForState(reduceTask, TaskState.RUNNING);
//send the done signal to the reduce
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduceTask.getAttempts().values().iterator().next().getID(),
TaskAttemptEventType.TA_DONE));
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
}
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
@ -1019,6 +1130,18 @@ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
}
}
static class MRAppNoShuffleSecret extends MRAppWithHistory {
public MRAppNoShuffleSecret(int maps, int reduces, boolean autoComplete,
String testName, boolean cleanOnStart, int startCount) {
super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
}
@Override
protected void downloadTokensAndSetupUGI(Configuration conf) {
// do NOT put a shuffle secret in the job credentials
}
}
public static void main(String[] arg) throws Exception {
TestRecovery test = new TestRecovery();
test.testCrashed();

View File

@ -491,7 +491,7 @@ private boolean testUberDecision(Configuration conf) {
MRAppMetrics mrAppMetrics = MRAppMetrics.create();
JobImpl job = new JobImpl(jobId, Records
.newRecord(ApplicationAttemptId.class), conf, mock(EventHandler.class),
null, mock(JobTokenSecretManager.class), null, null, null,
null, new JobTokenSecretManager(), new Credentials(), null, null,
mrAppMetrics, true, null, 0, null, null, null, null);
InitTransition initTransition = getInitTransition(2);
JobEvent mockJobEvent = mock(JobEvent.class);

View File

@ -185,6 +185,7 @@ static synchronized String getOutputName(int partition) {
private int numSlotsRequired;
protected TaskUmbilicalProtocol umbilical;
protected SecretKey tokenSecret;
protected SecretKey shuffleSecret;
protected GcTimeUpdater gcUpdater;
////////////////////////////////////////////
@ -261,6 +262,21 @@ public SecretKey getJobTokenSecret() {
return this.tokenSecret;
}
/**
* Set the secret key used to authenticate the shuffle
* @param shuffleSecret the secret
*/
public void setShuffleSecret(SecretKey shuffleSecret) {
this.shuffleSecret = shuffleSecret;
}
/**
* Get the secret key used to authenticate the shuffle
* @return the shuffle secret
*/
public SecretKey getShuffleSecret() {
return this.shuffleSecret;
}
/**
* Get the index of this task within the job.

View File

@ -23,11 +23,15 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -62,6 +66,8 @@
@InterfaceStability.Unstable
class JobSubmitter {
protected static final Log LOG = LogFactory.getLog(JobSubmitter.class);
private static final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1";
private static final int SHUFFLE_KEY_LENGTH = 64;
private FileSystem jtFs;
private ClientProtocol submitClient;
private String submitHostName;
@ -359,6 +365,20 @@ JobStatus submitJobInternal(Job job, Cluster cluster)
populateTokenCache(conf, job.getCredentials());
// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

View File

@ -154,7 +154,8 @@ private static void mergeBinaryTokens(Credentials creds, Configuration conf) {
*/
@InterfaceAudience.Private
public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
private static final Text JOB_TOKEN = new Text("ShuffleAndJobToken");
private static final Text JOB_TOKEN = new Text("JobToken");
private static final Text SHUFFLE_TOKEN = new Text("MapReduceShuffleToken");
/**
* load job token from a file
@ -194,4 +195,14 @@ public static void setJobToken(Token<? extends TokenIdentifier> t,
public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
}
@InterfaceAudience.Private
public static void setShuffleSecretKey(byte[] key, Credentials credentials) {
credentials.addSecretKey(SHUFFLE_TOKEN, key);
}
@InterfaceAudience.Private
public static byte[] getShuffleSecretKey(Credentials credentials) {
return getSecretKey(credentials, SHUFFLE_TOKEN);
}
}

View File

@ -82,7 +82,7 @@ private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
private final int connectionTimeout;
private final int readTimeout;
private final SecretKey jobTokenSecret;
private final SecretKey shuffleSecretKey;
private volatile boolean stopped = false;
@ -92,7 +92,7 @@ private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
public Fetcher(JobConf job, TaskAttemptID reduceId,
ShuffleScheduler<K,V> scheduler, MergeManager<K,V> merger,
Reporter reporter, ShuffleClientMetrics metrics,
ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) {
ExceptionReporter exceptionReporter, SecretKey shuffleKey) {
this.reporter = reporter;
this.scheduler = scheduler;
this.merger = merger;
@ -100,7 +100,7 @@ public Fetcher(JobConf job, TaskAttemptID reduceId,
this.exceptionReporter = exceptionReporter;
this.id = ++nextId;
this.reduce = reduceId.getTaskID().getId();
this.jobTokenSecret = jobTokenSecret;
this.shuffleSecretKey = shuffleKey;
ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
ShuffleErrors.IO_ERROR.toString());
wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
@ -228,7 +228,8 @@ protected void copyFromHost(MapHost host) throws IOException {
// generate hash of the url
String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
String encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
String encHash = SecureShuffleUtils.hashFromString(msgToEncode,
shuffleSecretKey);
// put url hash into http header
connection.addRequestProperty(
@ -253,7 +254,7 @@ protected void copyFromHost(MapHost host) throws IOException {
}
LOG.debug("url="+msgToEncode+";encHash="+encHash+";replyHash="+replyHash);
// verify that replyHash is HMac of encHash
SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
SecureShuffleUtils.verifyReply(replyHash, encHash, shuffleSecretKey);
LOG.info("for url="+msgToEncode+" sent hash and received reply");
} catch (IOException ie) {
boolean connectExcpt = ie instanceof ConnectException;

View File

@ -108,7 +108,7 @@ public RawKeyValueIterator run() throws IOException, InterruptedException {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getJobTokenSecret());
reduceTask.getShuffleSecret());
fetchers[i].start();
}

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.Counters.Group;
@ -106,7 +107,7 @@ public void testRunner() throws Exception {
Token<ApplicationTokenIdentifier> token = new Token<ApplicationTokenIdentifier>(
"user".getBytes(), "password".getBytes(), new Text("kind"), new Text(
"service"));
conf.getCredentials().addToken(new Text("ShuffleAndJobToken"), token);
TokenCache.setJobToken(token, conf.getCredentials());
conf.setBoolean(MRJobConfig.SKIP_RECORDS, true);
TestTaskReporter reporter = new TestTaskReporter();
PipesMapRunner<FloatWritable, NullWritable, IntWritable, Text> runner = new PipesMapRunner<FloatWritable, NullWritable, IntWritable, Text>();
@ -171,7 +172,7 @@ public void testApplication() throws Throwable {
"user".getBytes(), "password".getBytes(), new Text("kind"), new Text(
"service"));
conf.getCredentials().addToken(new Text("ShuffleAndJobToken"), token);
TokenCache.setJobToken(token, conf.getCredentials());
FakeCollector output = new FakeCollector(new Counters.Counter(),
new Progress());
FileSystem fs = new RawLocalFileSystem();
@ -391,7 +392,7 @@ public void testPipesReduser() throws Exception {
Token<ApplicationTokenIdentifier> token = new Token<ApplicationTokenIdentifier>(
"user".getBytes(), "password".getBytes(), new Text("kind"), new Text(
"service"));
conf.getCredentials().addToken(new Text("ShuffleAndJobToken"), token);
TokenCache.setJobToken(token, conf.getCredentials());
File fCommand = getFileCommand("org.apache.hadoop.mapred.pipes.PipeReducerStub");
conf.set(MRJobConfig.CACHE_LOCALFILES, fCommand.getAbsolutePath());