MAPREDUCE-5685. Fixed a bug with JobContext getCacheFiles API inside the WrappedReducer class. Contributed by Yi Song.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1554320 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7f86c8114e
commit
b524501d4f
@ -264,6 +264,9 @@ Release 2.4.0 - UNRELEASED
|
||||
MAPREDUCE-5694. Fixed MR AppMaster to shutdown the LogManager so as to avoid
|
||||
losing syslog in some conditions. (Mohammad Kamrul Islam via vinodkv)
|
||||
|
||||
MAPREDUCE-5685. Fixed a bug with JobContext getCacheFiles API inside the
|
||||
WrappedReducer class. (Yi Song via vinodkv)
|
||||
|
||||
Release 2.3.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -44,6 +44,8 @@
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
|
||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
|
||||
@ -82,12 +84,11 @@ public class TestMRWithDistributedCache extends TestCase {
|
||||
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(TestMRWithDistributedCache.class);
|
||||
|
||||
private static class DistributedCacheChecker {
|
||||
|
||||
public static class DistributedCacheChecker extends
|
||||
Mapper<LongWritable, Text, NullWritable, NullWritable> {
|
||||
|
||||
@Override
|
||||
public void setup(Context context) throws IOException {
|
||||
public void setup(TaskInputOutputContext<?, ?, ?, ?> context)
|
||||
throws IOException {
|
||||
Configuration conf = context.getConfiguration();
|
||||
Path[] localFiles = context.getLocalCacheFiles();
|
||||
URI[] files = context.getCacheFiles();
|
||||
@ -101,6 +102,10 @@ public void setup(Context context) throws IOException {
|
||||
TestCase.assertEquals(2, files.length);
|
||||
TestCase.assertEquals(2, archives.length);
|
||||
|
||||
// Check the file name
|
||||
TestCase.assertTrue(files[0].getPath().endsWith("distributed.first"));
|
||||
TestCase.assertTrue(files[1].getPath().endsWith("distributed.second.jar"));
|
||||
|
||||
// Check lengths of the files
|
||||
TestCase.assertEquals(1, fs.getFileStatus(localFiles[0]).getLen());
|
||||
TestCase.assertTrue(fs.getFileStatus(localFiles[1]).getLen() > 1);
|
||||
@ -130,8 +135,28 @@ public void setup(Context context) throws IOException {
|
||||
TestCase.assertTrue("second file should be symlinked too",
|
||||
expectedAbsentSymlinkFile.exists());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
public static class DistributedCacheCheckerMapper extends
|
||||
Mapper<LongWritable, Text, NullWritable, NullWritable> {
|
||||
|
||||
@Override
|
||||
protected void setup(Context context) throws IOException,
|
||||
InterruptedException {
|
||||
new DistributedCacheChecker().setup(context);
|
||||
}
|
||||
}
|
||||
|
||||
public static class DistributedCacheCheckerReducer extends
|
||||
Reducer<LongWritable, Text, NullWritable, NullWritable> {
|
||||
|
||||
@Override
|
||||
public void setup(Context context) throws IOException {
|
||||
new DistributedCacheChecker().setup(context);
|
||||
}
|
||||
}
|
||||
|
||||
private void testWithConf(Configuration conf) throws IOException,
|
||||
InterruptedException, ClassNotFoundException, URISyntaxException {
|
||||
// Create a temporary file of length 1.
|
||||
@ -146,7 +171,8 @@ private void testWithConf(Configuration conf) throws IOException,
|
||||
|
||||
|
||||
Job job = Job.getInstance(conf);
|
||||
job.setMapperClass(DistributedCacheChecker.class);
|
||||
job.setMapperClass(DistributedCacheCheckerMapper.class);
|
||||
job.setReducerClass(DistributedCacheCheckerReducer.class);
|
||||
job.setOutputFormatClass(NullOutputFormat.class);
|
||||
FileInputFormat.setInputPaths(job, first);
|
||||
// Creates the Job Configuration
|
||||
|
@ -137,7 +137,7 @@ public URI[] getCacheArchives() throws IOException {
|
||||
|
||||
@Override
|
||||
public URI[] getCacheFiles() throws IOException {
|
||||
return reduceContext.getCacheArchives();
|
||||
return reduceContext.getCacheFiles();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
Reference in New Issue
Block a user