MAPREDUCE-5952. LocalContainerLauncher#renameMapOutputForReduce incorrectly assumes a single dir for mapOutIndex. (Gera Shegalov via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611196 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2014-07-16 21:24:51 +00:00
parent e896de98f2
commit 43a12f3c01
3 changed files with 117 additions and 38 deletions

View File

@ -307,6 +307,9 @@ Release 2.5.0 - UNRELEASED
resource configuration for deciding uber-mode on map-only jobs. (Siqi Li via
vinodkv)
MAPREDUCE-5952. LocalContainerLauncher#renameMapOutputForReduce incorrectly
assumes a single dir for mapOutIndex. (Gera Shegalov via kasha)
Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES

View File

@ -30,6 +30,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSError;
@ -437,43 +438,6 @@ private void runSubtask(org.apache.hadoop.mapred.Task task,
}
}
/**
* Within the _local_ filesystem (not HDFS), all activity takes place within
* a single subdir (${local.dir}/usercache/$user/appcache/$appId/$contId/),
* and all sub-MapTasks create the same filename ("file.out"). Rename that
* to something unique (e.g., "map_0.out") to avoid collisions.
*
* Longer-term, we'll modify [something] to use TaskAttemptID-based
* filenames instead of "file.out". (All of this is entirely internal,
* so there are no particular compatibility issues.)
*/
private MapOutputFile renameMapOutputForReduce(JobConf conf,
TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
// move map output to reduce input
Path mapOut = subMapOutputFile.getOutputFile();
FileStatus mStatus = localFs.getFileStatus(mapOut);
Path reduceIn = subMapOutputFile.getInputFileForWrite(
TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
Path mapOutIndex = new Path(mapOut.toString() + ".index");
Path reduceInIndex = new Path(reduceIn.toString() + ".index");
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming map output file for task attempt "
+ mapId.toString() + " from original location " + mapOut.toString()
+ " to destination " + reduceIn.toString());
}
if (!localFs.mkdirs(reduceIn.getParent())) {
throw new IOException("Mkdirs failed to create "
+ reduceIn.getParent().toString());
}
if (!localFs.rename(mapOut, reduceIn))
throw new IOException("Couldn't rename " + mapOut);
if (!localFs.rename(mapOutIndex, reduceInIndex))
throw new IOException("Couldn't rename " + mapOutIndex);
return new RenamedMapOutputFile(reduceIn);
}
/**
* Also within the local filesystem, we need to restore the initial state
* of the directory as much as possible. Compare current contents against
@ -506,7 +470,46 @@ private void relocalize() {
}
} // end EventHandler
/**
* Within the _local_ filesystem (not HDFS), all activity takes place within
* a subdir inside one of the LOCAL_DIRS
* (${local.dir}/usercache/$user/appcache/$appId/$contId/),
* and all sub-MapTasks create the same filename ("file.out"). Rename that
* to something unique (e.g., "map_0.out") to avoid possible collisions.
*
* Longer-term, we'll modify [something] to use TaskAttemptID-based
* filenames instead of "file.out". (All of this is entirely internal,
* so there are no particular compatibility issues.)
*/
@VisibleForTesting
protected static MapOutputFile renameMapOutputForReduce(JobConf conf,
TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
FileSystem localFs = FileSystem.getLocal(conf);
// move map output to reduce input
Path mapOut = subMapOutputFile.getOutputFile();
FileStatus mStatus = localFs.getFileStatus(mapOut);
Path reduceIn = subMapOutputFile.getInputFileForWrite(
TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
Path mapOutIndex = subMapOutputFile.getOutputIndexFile();
Path reduceInIndex = new Path(reduceIn.toString() + ".index");
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming map output file for task attempt "
+ mapId.toString() + " from original location " + mapOut.toString()
+ " to destination " + reduceIn.toString());
}
if (!localFs.mkdirs(reduceIn.getParent())) {
throw new IOException("Mkdirs failed to create "
+ reduceIn.getParent().toString());
}
if (!localFs.rename(mapOut, reduceIn))
throw new IOException("Couldn't rename " + mapOut);
if (!localFs.rename(mapOutIndex, reduceInIndex))
throw new IOException("Couldn't rename " + mapOutIndex);
return new RenamedMapOutputFile(reduceIn);
}
private static class RenamedMapOutputFile extends MapOutputFile {
private Path path;

View File

@ -18,17 +18,26 @@
package org.apache.hadoop.mapred;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@ -46,6 +55,9 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -53,6 +65,36 @@
public class TestLocalContainerLauncher {
private static final Log LOG =
LogFactory.getLog(TestLocalContainerLauncher.class);
private static File testWorkDir;
private static final String[] localDirs = new String[2];
private static void delete(File dir) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
Path p = fs.makeQualified(new Path(dir.getAbsolutePath()));
fs.delete(p, true);
}
@BeforeClass
public static void setupTestDirs() throws IOException {
testWorkDir = new File("target",
TestLocalContainerLauncher.class.getCanonicalName());
testWorkDir.delete();
testWorkDir.mkdirs();
testWorkDir = testWorkDir.getAbsoluteFile();
for (int i = 0; i < localDirs.length; i++) {
final File dir = new File(testWorkDir, "local-" + i);
dir.mkdirs();
localDirs[i] = dir.toString();
}
}
@AfterClass
public static void cleanupTestDirs() throws IOException {
if (testWorkDir != null) {
delete(testWorkDir);
}
}
@SuppressWarnings("rawtypes")
@Test(timeout=10000)
@ -141,4 +183,35 @@ private static Container createMockContainer() {
when(container.getNodeId()).thenReturn(nodeId);
return container;
}
@Test
public void testRenameMapOutputForReduce() throws Exception {
final JobConf conf = new JobConf();
final MROutputFiles mrOutputFiles = new MROutputFiles();
mrOutputFiles.setConf(conf);
// make sure both dirs are distinct
//
conf.set(MRConfig.LOCAL_DIR, localDirs[0].toString());
final Path mapOut = mrOutputFiles.getOutputFileForWrite(1);
conf.set(MRConfig.LOCAL_DIR, localDirs[1].toString());
final Path mapOutIdx = mrOutputFiles.getOutputIndexFileForWrite(1);
Assert.assertNotEquals("Paths must be different!",
mapOut.getParent(), mapOutIdx.getParent());
// make both dirs part of LOCAL_DIR
conf.setStrings(MRConfig.LOCAL_DIR, localDirs);
final FileContext lfc = FileContext.getLocalFSFileContext(conf);
lfc.create(mapOut, EnumSet.of(CREATE)).close();
lfc.create(mapOutIdx, EnumSet.of(CREATE)).close();
final JobId jobId = MRBuilderUtils.newJobId(12345L, 1, 2);
final TaskId tid = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
final TaskAttemptId taid = MRBuilderUtils.newTaskAttemptId(tid, 0);
LocalContainerLauncher.renameMapOutputForReduce(conf, taid, mrOutputFiles);
}
}