diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 68b780fa3a..cb74b984cd 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -224,6 +224,9 @@ Release 2.0.5-beta - UNRELEASED MAPREDUCE-5159. Change ValueAggregatorJob to add APIs which can support binary compatibility with hadoop-1 examples. (Zhijie Shen via vinodkv) + MAPREDUCE-5157. Bring back old sampler related code so that we can support + binary compatibility with hadoop-1 sorter example. (Zhijie Shen via vinodkv) + OPTIMIZATIONS MAPREDUCE-4974. Optimising the LineRecordReader initialize() method diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java index b99b0c7d6e..a55abe646c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/InputSampler.java @@ -19,10 +19,18 @@ package org.apache.hadoop.mapred.lib; import java.io.IOException; +import java.util.ArrayList; +import java.util.Random; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.Job; @InterfaceAudience.Public @@ -30,6 +38,8 @@ public class InputSampler extends org.apache.hadoop.mapreduce.lib.partition.InputSampler { + private static final Log LOG = LogFactory.getLog(InputSampler.class); + public InputSampler(JobConf conf) { super(conf); } @@ -38,4 +48,219 @@ public static void writePartitionFile(JobConf job, Sampler sampler) throws IOException, ClassNotFoundException, InterruptedException { writePartitionFile(new Job(job), sampler); } + /** + * Interface to sample using an {@link org.apache.hadoop.mapred.InputFormat}. + */ + public interface Sampler extends + org.apache.hadoop.mapreduce.lib.partition.InputSampler.Sampler { + /** + * For a given job, collect and return a subset of the keys from the + * input data. + */ + K[] getSample(InputFormat inf, JobConf job) throws IOException; + } + + /** + * Samples the first n records from s splits. + * Inexpensive way to sample random data. + */ + public static class SplitSampler extends + org.apache.hadoop.mapreduce.lib.partition.InputSampler.SplitSampler + implements Sampler { + + /** + * Create a SplitSampler sampling all splits. + * Takes the first numSamples / numSplits records from each split. + * @param numSamples Total number of samples to obtain from all selected + * splits. + */ + public SplitSampler(int numSamples) { + this(numSamples, Integer.MAX_VALUE); + } + + /** + * Create a new SplitSampler. + * @param numSamples Total number of samples to obtain from all selected + * splits. + * @param maxSplitsSampled The maximum number of splits to examine. + */ + public SplitSampler(int numSamples, int maxSplitsSampled) { + super(numSamples, maxSplitsSampled); + } + + /** + * From each split sampled, take the first numSamples / numSplits records. + */ + @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type + public K[] getSample(InputFormat inf, JobConf job) throws IOException { + InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks()); + ArrayList samples = new ArrayList(numSamples); + int splitsToSample = Math.min(maxSplitsSampled, splits.length); + int splitStep = splits.length / splitsToSample; + int samplesPerSplit = numSamples / splitsToSample; + long records = 0; + for (int i = 0; i < splitsToSample; ++i) { + RecordReader reader = inf.getRecordReader(splits[i * splitStep], + job, Reporter.NULL); + K key = reader.createKey(); + V value = reader.createValue(); + while (reader.next(key, value)) { + samples.add(key); + key = reader.createKey(); + ++records; + if ((i+1) * samplesPerSplit <= records) { + break; + } + } + reader.close(); + } + return (K[])samples.toArray(); + } + } + + /** + * Sample from random points in the input. + * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from + * each split. + */ + public static class RandomSampler extends + org.apache.hadoop.mapreduce.lib.partition.InputSampler.RandomSampler + implements Sampler { + + /** + * Create a new RandomSampler sampling all splits. + * This will read every split at the client, which is very expensive. + * @param freq Probability with which a key will be chosen. + * @param numSamples Total number of samples to obtain from all selected + * splits. + */ + public RandomSampler(double freq, int numSamples) { + this(freq, numSamples, Integer.MAX_VALUE); + } + + /** + * Create a new RandomSampler. + * @param freq Probability with which a key will be chosen. + * @param numSamples Total number of samples to obtain from all selected + * splits. + * @param maxSplitsSampled The maximum number of splits to examine. + */ + public RandomSampler(double freq, int numSamples, int maxSplitsSampled) { + super(freq, numSamples, maxSplitsSampled); + } + + /** + * Randomize the split order, then take the specified number of keys from + * each split sampled, where each key is selected with the specified + * probability and possibly replaced by a subsequently selected key when + * the quota of keys from that split is satisfied. + */ + @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type + public K[] getSample(InputFormat inf, JobConf job) throws IOException { + InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks()); + ArrayList samples = new ArrayList(numSamples); + int splitsToSample = Math.min(maxSplitsSampled, splits.length); + + Random r = new Random(); + long seed = r.nextLong(); + r.setSeed(seed); + LOG.debug("seed: " + seed); + // shuffle splits + for (int i = 0; i < splits.length; ++i) { + InputSplit tmp = splits[i]; + int j = r.nextInt(splits.length); + splits[i] = splits[j]; + splits[j] = tmp; + } + // our target rate is in terms of the maximum number of sample splits, + // but we accept the possibility of sampling additional splits to hit + // the target sample keyset + for (int i = 0; i < splitsToSample || + (i < splits.length && samples.size() < numSamples); ++i) { + RecordReader reader = inf.getRecordReader(splits[i], job, + Reporter.NULL); + K key = reader.createKey(); + V value = reader.createValue(); + while (reader.next(key, value)) { + if (r.nextDouble() <= freq) { + if (samples.size() < numSamples) { + samples.add(key); + } else { + // When exceeding the maximum number of samples, replace a + // random element with this one, then adjust the frequency + // to reflect the possibility of existing elements being + // pushed out + int ind = r.nextInt(numSamples); + if (ind != numSamples) { + samples.set(ind, key); + } + freq *= (numSamples - 1) / (double) numSamples; + } + key = reader.createKey(); + } + } + reader.close(); + } + return (K[])samples.toArray(); + } + } + + /** + * Sample from s splits at regular intervals. + * Useful for sorted data. + */ + public static class IntervalSampler extends + org.apache.hadoop.mapreduce.lib.partition.InputSampler.IntervalSampler + implements Sampler { + + /** + * Create a new IntervalSampler sampling all splits. + * @param freq The frequency with which records will be emitted. + */ + public IntervalSampler(double freq) { + this(freq, Integer.MAX_VALUE); + } + + /** + * Create a new IntervalSampler. + * @param freq The frequency with which records will be emitted. + * @param maxSplitsSampled The maximum number of splits to examine. + * @see #getSample + */ + public IntervalSampler(double freq, int maxSplitsSampled) { + super(freq, maxSplitsSampled); + } + + /** + * For each split sampled, emit when the ratio of the number of records + * retained to the total record count is less than the specified + * frequency. + */ + @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type + public K[] getSample(InputFormat inf, JobConf job) throws IOException { + InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks()); + ArrayList samples = new ArrayList(); + int splitsToSample = Math.min(maxSplitsSampled, splits.length); + int splitStep = splits.length / splitsToSample; + long records = 0; + long kept = 0; + for (int i = 0; i < splitsToSample; ++i) { + RecordReader reader = inf.getRecordReader(splits[i * splitStep], + job, Reporter.NULL); + K key = reader.createKey(); + V value = reader.createValue(); + while (reader.next(key, value)) { + ++records; + if ((double) kept / records < freq) { + ++kept; + samples.add(key); + key = reader.createKey(); + } + } + reader.close(); + } + return (K[])samples.toArray(); + } + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java index 72b47f282e..742316875c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/InputSampler.java @@ -96,8 +96,8 @@ K[] getSample(InputFormat inf, Job job) */ public static class SplitSampler implements Sampler { - private final int numSamples; - private final int maxSplitsSampled; + protected final int numSamples; + protected final int maxSplitsSampled; /** * Create a SplitSampler sampling all splits. @@ -157,9 +157,9 @@ public K[] getSample(InputFormat inf, Job job) * each split. */ public static class RandomSampler implements Sampler { - private double freq; - private final int numSamples; - private final int maxSplitsSampled; + protected double freq; + protected final int numSamples; + protected final int maxSplitsSampled; /** * Create a new RandomSampler sampling all splits. @@ -249,8 +249,8 @@ public K[] getSample(InputFormat inf, Job job) * Useful for sorted data. */ public static class IntervalSampler implements Sampler { - private final double freq; - private final int maxSplitsSampled; + protected final double freq; + protected final int maxSplitsSampled; /** * Create a new IntervalSampler sampling all splits. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java index b35e843df7..d2bcd6f9aa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/partition/TestInputSampler.java @@ -17,23 +17,26 @@ */ package org.apache.hadoop.mapreduce.lib.partition; +import static org.junit.Assert.assertEquals; + +import java.io.DataInput; +import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.junit.Test; -import static org.junit.Assert.*; - import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.junit.Test; public class TestInputSampler { @@ -47,6 +50,24 @@ static class SequentialSplit extends InputSplit { public int getInit() { return i; } } + static class MapredSequentialSplit implements org.apache.hadoop.mapred.InputSplit { + private int i; + MapredSequentialSplit(int i) { + this.i = i; + } + @Override + public long getLength() { return 0; } + @Override + public String[] getLocations() { return new String[0]; } + public int getInit() { return i; } + @Override + public void write(DataOutput out) throws IOException { + } + @Override + public void readFields(DataInput in) throws IOException { + } + } + static class TestInputSamplerIF extends InputFormat { @@ -90,6 +111,71 @@ public void close() { } } + static class TestMapredInputSamplerIF extends TestInputSamplerIF implements + org.apache.hadoop.mapred.InputFormat { + + TestMapredInputSamplerIF(int maxDepth, int numSplits, int... splitInit) { + super(maxDepth, numSplits, splitInit); + } + + @Override + public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, + int numSplits) throws IOException { + List splits = null; + try { + splits = getSplits(Job.getInstance(job)); + } catch (InterruptedException e) { + throw new IOException(e); + } + org.apache.hadoop.mapred.InputSplit[] retVals = + new org.apache.hadoop.mapred.InputSplit[splits.size()]; + for (int i = 0; i < splits.size(); ++i) { + MapredSequentialSplit split = new MapredSequentialSplit( + ((SequentialSplit) splits.get(i)).getInit()); + retVals[i] = split; + } + return retVals; + } + + @Override + public org.apache.hadoop.mapred.RecordReader + getRecordReader(final org.apache.hadoop.mapred.InputSplit split, + JobConf job, Reporter reporter) throws IOException { + return new org.apache.hadoop.mapred.RecordReader + () { + private final IntWritable i = + new IntWritable(((MapredSequentialSplit)split).getInit()); + private int maxVal = i.get() + maxDepth + 1; + + @Override + public boolean next(IntWritable key, NullWritable value) + throws IOException { + i.set(i.get() + 1); + return i.get() < maxVal; + } + @Override + public IntWritable createKey() { + return new IntWritable(i.get()); + } + @Override + public NullWritable createValue() { + return NullWritable.get(); + } + @Override + public long getPos() throws IOException { + return 0; + } + @Override + public void close() throws IOException { + } + @Override + public float getProgress() throws IOException { + return 0; + } + }; + } + } + /** * Verify SplitSampler contract, that an equal number of records are taken * from the first splits. @@ -118,6 +204,36 @@ public void testSplitSampler() throws Exception { } } + /** + * Verify SplitSampler contract in mapred.lib.InputSampler, which is added + * back for binary compatibility of M/R 1.x + */ + @Test (timeout = 30000) + @SuppressWarnings("unchecked") // IntWritable comparator not typesafe + public void testMapredSplitSampler() throws Exception { + final int TOT_SPLITS = 15; + final int NUM_SPLITS = 5; + final int STEP_SAMPLE = 5; + final int NUM_SAMPLES = NUM_SPLITS * STEP_SAMPLE; + org.apache.hadoop.mapred.lib.InputSampler.Sampler + sampler = new org.apache.hadoop.mapred.lib.InputSampler.SplitSampler + (NUM_SAMPLES, NUM_SPLITS); + int inits[] = new int[TOT_SPLITS]; + for (int i = 0; i < TOT_SPLITS; ++i) { + inits[i] = i * STEP_SAMPLE; + } + Object[] samples = sampler.getSample( + new TestMapredInputSamplerIF(100000, TOT_SPLITS, inits), + new JobConf()); + assertEquals(NUM_SAMPLES, samples.length); + Arrays.sort(samples, new IntWritable.Comparator()); + for (int i = 0; i < NUM_SAMPLES; ++i) { + // mapred.lib.InputSampler.SplitSampler has a sampling step + assertEquals(i % STEP_SAMPLE + TOT_SPLITS * (i / STEP_SAMPLE), + ((IntWritable)samples[i]).get()); + } + } + /** * Verify IntervalSampler contract, that samples are taken at regular * intervals from the given splits. @@ -146,4 +262,33 @@ public void testIntervalSampler() throws Exception { } } + /** + * Verify IntervalSampler in mapred.lib.InputSampler, which is added back + * for binary compatibility of M/R 1.x + */ + @Test (timeout = 30000) + @SuppressWarnings("unchecked") // IntWritable comparator not typesafe + public void testMapredIntervalSampler() throws Exception { + final int TOT_SPLITS = 16; + final int PER_SPLIT_SAMPLE = 4; + final int NUM_SAMPLES = TOT_SPLITS * PER_SPLIT_SAMPLE; + final double FREQ = 1.0 / TOT_SPLITS; + org.apache.hadoop.mapred.lib.InputSampler.Sampler + sampler = new org.apache.hadoop.mapred.lib.InputSampler.IntervalSampler + (FREQ, NUM_SAMPLES); + int inits[] = new int[TOT_SPLITS]; + for (int i = 0; i < TOT_SPLITS; ++i) { + inits[i] = i; + } + Job ignored = Job.getInstance(); + Object[] samples = sampler.getSample(new TestInputSamplerIF( + NUM_SAMPLES, TOT_SPLITS, inits), ignored); + assertEquals(NUM_SAMPLES, samples.length); + Arrays.sort(samples, new IntWritable.Comparator()); + for (int i = 0; i < NUM_SAMPLES; ++i) { + assertEquals(i, + ((IntWritable)samples[i]).get()); + } + } + }