MAPREDUCE-3253. Fixed ContextFactory to clone JobContext correctly.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1188842 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fffdf661e3
commit
2486e4eb09
@ -1764,6 +1764,9 @@ Release 0.23.0 - Unreleased
|
||||
MAPREDUCE-2821. Added missing fields (resourcePerMap & resourcePerReduce)
|
||||
to JobSummary logs. (mahadev via acmurthy)
|
||||
|
||||
MAPREDUCE-3253. Fixed ContextFactory to clone JobContext correctly.
|
||||
(acmurthy)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -24,6 +24,7 @@
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
|
||||
|
||||
/**
|
||||
* A factory to allow applications to deal with inconsistencies between
|
||||
@ -123,7 +124,7 @@ public class ContextFactory {
|
||||
WRAPPED_CONTEXT_FIELD = null;
|
||||
}
|
||||
MAP_CONTEXT_CONSTRUCTOR.setAccessible(true);
|
||||
REPORTER_FIELD = taskIOContextCls.getDeclaredField("reporter");
|
||||
REPORTER_FIELD = taskContextCls.getDeclaredField("reporter");
|
||||
REPORTER_FIELD.setAccessible(true);
|
||||
READER_FIELD = mapContextCls.getDeclaredField("reader");
|
||||
READER_FIELD.setAccessible(true);
|
||||
@ -141,7 +142,8 @@ public class ContextFactory {
|
||||
}
|
||||
|
||||
/**
|
||||
* Clone a job or task attempt context with a new configuration.
|
||||
* Clone a {@link JobContext} or {@link TaskAttemptContext} with a
|
||||
* new configuration.
|
||||
* @param original the original context
|
||||
* @param conf the new configuration
|
||||
* @return a new context object
|
||||
@ -176,7 +178,8 @@ public static JobContext cloneContext(JobContext original,
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy a mapper context, optionally replacing the input and output.
|
||||
* Copy a custom {@link WrappedMapper.Context}, optionally replacing
|
||||
* the input and output.
|
||||
* @param <K1> input key type
|
||||
* @param <V1> input value type
|
||||
* @param <K2> output key type
|
||||
|
@ -0,0 +1,46 @@
|
||||
package org.apache.hadoop.mapreduce;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
|
||||
import org.apache.hadoop.mapreduce.task.JobContextImpl;
|
||||
import org.apache.hadoop.mapreduce.task.MapContextImpl;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestContextFactory {
|
||||
|
||||
JobID jobId;
|
||||
Configuration conf;
|
||||
JobContext jobContext;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
conf = new Configuration();
|
||||
jobId = new JobID("test", 1);
|
||||
jobContext = new JobContextImpl(conf, jobId);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloneContext() throws Exception {
|
||||
ContextFactory.cloneContext(jobContext, conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloneMapContext() throws Exception {
|
||||
TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
|
||||
TaskAttemptID taskAttemptid = new TaskAttemptID(taskId, 0);
|
||||
MapContext<IntWritable, IntWritable, IntWritable, IntWritable> mapContext =
|
||||
new MapContextImpl<IntWritable, IntWritable, IntWritable, IntWritable>(
|
||||
conf, taskAttemptid, null, null, null, null, null);
|
||||
Mapper<IntWritable, IntWritable, IntWritable, IntWritable>.Context mapperContext =
|
||||
new WrappedMapper<IntWritable, IntWritable, IntWritable, IntWritable>().getMapContext(
|
||||
mapContext);
|
||||
ContextFactory.cloneMapContext(mapperContext, conf, null, null);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void tearDown() throws Exception {
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user