MAPREDUCE-5157. Bring back old sampler related code so that we can support binary compatibility with hadoop-1 sorter example. Contributed by Zhijie Shen.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1480474 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-08 21:42:47 +00:00
parent 67d7d8f4d3
commit f6167ef6c6
4 changed files with 384 additions and 11 deletions

View File

@ -224,6 +224,9 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5159. Change ValueAggregatorJob to add APIs which can support
binary compatibility with hadoop-1 examples. (Zhijie Shen via vinodkv)
MAPREDUCE-5157. Bring back old sampler related code so that we can support
binary compatibility with hadoop-1 sorter example. (Zhijie Shen via vinodkv)
OPTIMIZATIONS
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method

View File

@ -19,10 +19,18 @@
package org.apache.hadoop.mapred.lib;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
@InterfaceAudience.Public
@ -30,6 +38,8 @@
public class InputSampler<K,V> extends
org.apache.hadoop.mapreduce.lib.partition.InputSampler<K, V> {
private static final Log LOG = LogFactory.getLog(InputSampler.class);
public InputSampler(JobConf conf) {
super(conf);
}
@ -38,4 +48,219 @@ public static <K,V> void writePartitionFile(JobConf job, Sampler<K,V> sampler)
throws IOException, ClassNotFoundException, InterruptedException {
writePartitionFile(new Job(job), sampler);
}
/**
* Interface to sample using an {@link org.apache.hadoop.mapred.InputFormat}.
*/
public interface Sampler<K,V> extends
org.apache.hadoop.mapreduce.lib.partition.InputSampler.Sampler<K, V> {
/**
* For a given job, collect and return a subset of the keys from the
* input data.
*/
K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException;
}
/**
* Samples the first n records from s splits.
* Inexpensive way to sample random data.
*/
public static class SplitSampler<K,V> extends
org.apache.hadoop.mapreduce.lib.partition.InputSampler.SplitSampler<K, V>
implements Sampler<K,V> {
/**
* Create a SplitSampler sampling <em>all</em> splits.
* Takes the first numSamples / numSplits records from each split.
* @param numSamples Total number of samples to obtain from all selected
* splits.
*/
public SplitSampler(int numSamples) {
this(numSamples, Integer.MAX_VALUE);
}
/**
* Create a new SplitSampler.
* @param numSamples Total number of samples to obtain from all selected
* splits.
* @param maxSplitsSampled The maximum number of splits to examine.
*/
public SplitSampler(int numSamples, int maxSplitsSampled) {
super(numSamples, maxSplitsSampled);
}
/**
* From each split sampled, take the first numSamples / numSplits records.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
int samplesPerSplit = numSamples / splitsToSample;
long records = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
samples.add(key);
key = reader.createKey();
++records;
if ((i+1) * samplesPerSplit <= records) {
break;
}
}
reader.close();
}
return (K[])samples.toArray();
}
}
/**
* Sample from random points in the input.
* General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
* each split.
*/
public static class RandomSampler<K,V> extends
org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler<K, V>
implements Sampler<K,V> {
/**
* Create a new RandomSampler sampling <em>all</em> splits.
* This will read every split at the client, which is very expensive.
* @param freq Probability with which a key will be chosen.
* @param numSamples Total number of samples to obtain from all selected
* splits.
*/
public RandomSampler(double freq, int numSamples) {
this(freq, numSamples, Integer.MAX_VALUE);
}
/**
* Create a new RandomSampler.
* @param freq Probability with which a key will be chosen.
* @param numSamples Total number of samples to obtain from all selected
* splits.
* @param maxSplitsSampled The maximum number of splits to examine.
*/
public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
super(freq, numSamples, maxSplitsSampled);
}
/**
* Randomize the split order, then take the specified number of keys from
* each split sampled, where each key is selected with the specified
* probability and possibly replaced by a subsequently selected key when
* the quota of keys from that split is satisfied.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>(numSamples);
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
Random r = new Random();
long seed = r.nextLong();
r.setSeed(seed);
LOG.debug("seed: " + seed);
// shuffle splits
for (int i = 0; i < splits.length; ++i) {
InputSplit tmp = splits[i];
int j = r.nextInt(splits.length);
splits[i] = splits[j];
splits[j] = tmp;
}
// our target rate is in terms of the maximum number of sample splits,
// but we accept the possibility of sampling additional splits to hit
// the target sample keyset
for (int i = 0; i < splitsToSample ||
(i < splits.length && samples.size() < numSamples); ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
if (r.nextDouble() <= freq) {
if (samples.size() < numSamples) {
samples.add(key);
} else {
// When exceeding the maximum number of samples, replace a
// random element with this one, then adjust the frequency
// to reflect the possibility of existing elements being
// pushed out
int ind = r.nextInt(numSamples);
if (ind != numSamples) {
samples.set(ind, key);
}
freq *= (numSamples - 1) / (double) numSamples;
}
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
}
/**
* Sample from s splits at regular intervals.
* Useful for sorted data.
*/
public static class IntervalSampler<K,V> extends
org.apache.hadoop.mapreduce.lib.partition.InputSampler.IntervalSampler<K, V>
implements Sampler<K,V> {
/**
* Create a new IntervalSampler sampling <em>all</em> splits.
* @param freq The frequency with which records will be emitted.
*/
public IntervalSampler(double freq) {
this(freq, Integer.MAX_VALUE);
}
/**
* Create a new IntervalSampler.
* @param freq The frequency with which records will be emitted.
* @param maxSplitsSampled The maximum number of splits to examine.
* @see #getSample
*/
public IntervalSampler(double freq, int maxSplitsSampled) {
super(freq, maxSplitsSampled);
}
/**
* For each split sampled, emit when the ratio of the number of records
* retained to the total record count is less than the specified
* frequency.
*/
@SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
ArrayList<K> samples = new ArrayList<K>();
int splitsToSample = Math.min(maxSplitsSampled, splits.length);
int splitStep = splits.length / splitsToSample;
long records = 0;
long kept = 0;
for (int i = 0; i < splitsToSample; ++i) {
RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
job, Reporter.NULL);
K key = reader.createKey();
V value = reader.createValue();
while (reader.next(key, value)) {
++records;
if ((double) kept / records < freq) {
++kept;
samples.add(key);
key = reader.createKey();
}
}
reader.close();
}
return (K[])samples.toArray();
}
}
}

View File

@ -96,8 +96,8 @@ K[] getSample(InputFormat<K,V> inf, Job job)
*/
public static class SplitSampler<K,V> implements Sampler<K,V> {
private final int numSamples;
private final int maxSplitsSampled;
protected final int numSamples;
protected final int maxSplitsSampled;
/**
* Create a SplitSampler sampling <em>all</em> splits.
@ -157,9 +157,9 @@ public K[] getSample(InputFormat<K,V> inf, Job job)
* each split.
*/
public static class RandomSampler<K,V> implements Sampler<K,V> {
private double freq;
private final int numSamples;
private final int maxSplitsSampled;
protected double freq;
protected final int numSamples;
protected final int maxSplitsSampled;
/**
* Create a new RandomSampler sampling <em>all</em> splits.
@ -249,8 +249,8 @@ public K[] getSample(InputFormat<K,V> inf, Job job)
* Useful for sorted data.
*/
public static class IntervalSampler<K,V> implements Sampler<K,V> {
private final double freq;
private final int maxSplitsSampled;
protected final double freq;
protected final int maxSplitsSampled;
/**
* Create a new IntervalSampler sampling <em>all</em> splits.

View File

@ -17,23 +17,26 @@
*/
package org.apache.hadoop.mapreduce.lib.partition;
import static org.junit.Assert.assertEquals;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
import static org.junit.Assert.*;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.junit.Test;
public class TestInputSampler {
@ -47,6 +50,24 @@ static class SequentialSplit extends InputSplit {
public int getInit() { return i; }
}
static class MapredSequentialSplit implements org.apache.hadoop.mapred.InputSplit {
private int i;
MapredSequentialSplit(int i) {
this.i = i;
}
@Override
public long getLength() { return 0; }
@Override
public String[] getLocations() { return new String[0]; }
public int getInit() { return i; }
@Override
public void write(DataOutput out) throws IOException {
}
@Override
public void readFields(DataInput in) throws IOException {
}
}
static class TestInputSamplerIF
extends InputFormat<IntWritable,NullWritable> {
@ -90,6 +111,71 @@ public void close() { }
}
static class TestMapredInputSamplerIF extends TestInputSamplerIF implements
org.apache.hadoop.mapred.InputFormat<IntWritable,NullWritable> {
TestMapredInputSamplerIF(int maxDepth, int numSplits, int... splitInit) {
super(maxDepth, numSplits, splitInit);
}
@Override
public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job,
int numSplits) throws IOException {
List<InputSplit> splits = null;
try {
splits = getSplits(Job.getInstance(job));
} catch (InterruptedException e) {
throw new IOException(e);
}
org.apache.hadoop.mapred.InputSplit[] retVals =
new org.apache.hadoop.mapred.InputSplit[splits.size()];
for (int i = 0; i < splits.size(); ++i) {
MapredSequentialSplit split = new MapredSequentialSplit(
((SequentialSplit) splits.get(i)).getInit());
retVals[i] = split;
}
return retVals;
}
@Override
public org.apache.hadoop.mapred.RecordReader<IntWritable, NullWritable>
getRecordReader(final org.apache.hadoop.mapred.InputSplit split,
JobConf job, Reporter reporter) throws IOException {
return new org.apache.hadoop.mapred.RecordReader
<IntWritable, NullWritable>() {
private final IntWritable i =
new IntWritable(((MapredSequentialSplit)split).getInit());
private int maxVal = i.get() + maxDepth + 1;
@Override
public boolean next(IntWritable key, NullWritable value)
throws IOException {
i.set(i.get() + 1);
return i.get() < maxVal;
}
@Override
public IntWritable createKey() {
return new IntWritable(i.get());
}
@Override
public NullWritable createValue() {
return NullWritable.get();
}
@Override
public long getPos() throws IOException {
return 0;
}
@Override
public void close() throws IOException {
}
@Override
public float getProgress() throws IOException {
return 0;
}
};
}
}
/**
* Verify SplitSampler contract, that an equal number of records are taken
* from the first splits.
@ -118,6 +204,36 @@ public void testSplitSampler() throws Exception {
}
}
/**
* Verify SplitSampler contract in mapred.lib.InputSampler, which is added
* back for binary compatibility of M/R 1.x
*/
@Test (timeout = 30000)
@SuppressWarnings("unchecked") // IntWritable comparator not typesafe
public void testMapredSplitSampler() throws Exception {
final int TOT_SPLITS = 15;
final int NUM_SPLITS = 5;
final int STEP_SAMPLE = 5;
final int NUM_SAMPLES = NUM_SPLITS * STEP_SAMPLE;
org.apache.hadoop.mapred.lib.InputSampler.Sampler<IntWritable,NullWritable>
sampler = new org.apache.hadoop.mapred.lib.InputSampler.SplitSampler
<IntWritable,NullWritable>(NUM_SAMPLES, NUM_SPLITS);
int inits[] = new int[TOT_SPLITS];
for (int i = 0; i < TOT_SPLITS; ++i) {
inits[i] = i * STEP_SAMPLE;
}
Object[] samples = sampler.getSample(
new TestMapredInputSamplerIF(100000, TOT_SPLITS, inits),
new JobConf());
assertEquals(NUM_SAMPLES, samples.length);
Arrays.sort(samples, new IntWritable.Comparator());
for (int i = 0; i < NUM_SAMPLES; ++i) {
// mapred.lib.InputSampler.SplitSampler has a sampling step
assertEquals(i % STEP_SAMPLE + TOT_SPLITS * (i / STEP_SAMPLE),
((IntWritable)samples[i]).get());
}
}
/**
* Verify IntervalSampler contract, that samples are taken at regular
* intervals from the given splits.
@ -146,4 +262,33 @@ public void testIntervalSampler() throws Exception {
}
}
/**
* Verify IntervalSampler in mapred.lib.InputSampler, which is added back
* for binary compatibility of M/R 1.x
*/
@Test (timeout = 30000)
@SuppressWarnings("unchecked") // IntWritable comparator not typesafe
public void testMapredIntervalSampler() throws Exception {
final int TOT_SPLITS = 16;
final int PER_SPLIT_SAMPLE = 4;
final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE;
final double FREQ = 1.0 / TOT_SPLITS;
org.apache.hadoop.mapred.lib.InputSampler.Sampler<IntWritable,NullWritable>
sampler = new org.apache.hadoop.mapred.lib.InputSampler.IntervalSampler
<IntWritable,NullWritable>(FREQ, NUM_SAMPLES);
int inits[] = new int[TOT_SPLITS];
for (int i = 0; i < TOT_SPLITS; ++i) {
inits[i] = i;
}
Job ignored = Job.getInstance();
Object[] samples = sampler.getSample(new TestInputSamplerIF(
NUM_SAMPLES, TOT_SPLITS, inits), ignored);
assertEquals(NUM_SAMPLES, samples.length);
Arrays.sort(samples, new IntWritable.Comparator());
for (int i = 0; i < NUM_SAMPLES; ++i) {
assertEquals(i,
((IntWritable)samples[i]).get());
}
}
}