diff --git a/hadoop-mapreduce-project/dev-support/jdiff/Apache_Hadoop_MapReduce_Common_2.6.0.xml b/hadoop-mapreduce-project/dev-support/jdiff/Apache_Hadoop_MapReduce_Common_2.6.0.xml
new file mode 100644
index 0000000000..66e206b059
--- /dev/null
+++ b/hadoop-mapreduce-project/dev-support/jdiff/Apache_Hadoop_MapReduce_Common_2.6.0.xml
@@ -0,0 +1,281 @@
+
+
+
+
+
+
+DistributedCache
is a facility provided by the Map-Reduce
+ framework to cache files (text, archives, jars etc.) needed by applications.
+
Applications specify the files, via urls (hdfs:// or http://) to be cached
+ via the {@link org.apache.hadoop.mapred.JobConf}. The
+ DistributedCache
assumes that the files specified via urls are
+ already present on the {@link FileSystem} at the path specified by the url
+ and are accessible by every machine in the cluster.
The framework will copy the necessary files on to the slave node before + any tasks for the job are executed on that node. Its efficiency stems from + the fact that the files are only copied once per job and the ability to + cache archives which are un-archived on the slaves.
+ +DistributedCache
can be used to distribute simple, read-only
+ data/text files and/or more complex types such as archives, jars etc.
+ Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes.
+ Jars may be optionally added to the classpath of the tasks, a rudimentary
+ software distribution mechanism. Files have execution permissions.
+ In older version of Hadoop Map/Reduce users could optionally ask for symlinks
+ to be created in the working directory of the child task. In the current
+ version symlinks are always created. If the URL does not have a fragment
+ the name of the file or directory will be used. If multiple files or
+ directories map to the same link name, the last one added, will be used. All
+ others will not even be downloaded.
DistributedCache
tracks modification timestamps of the cache
+ files. Clearly the cache files should not be modified by the application
+ or externally while the job is executing.
Here is an illustrative example on how to use the
+ DistributedCache
:
+ + It is also very common to use the DistributedCache by using + {@link org.apache.hadoop.util.GenericOptionsParser}. + + This class includes methods that should be used by users + (specifically those mentioned in the example above, as well + as {@link DistributedCache#addArchiveToClassPath(Path, Configuration)}), + as well as methods intended for use by the MapReduce framework + (e.g., {@link org.apache.hadoop.mapred.JobClient}). + + @see org.apache.hadoop.mapred.JobConf + @see org.apache.hadoop.mapred.JobClient + @see org.apache.hadoop.mapreduce.Job]]> + + + + ++ // Setting up the cache for the application + + 1. Copy the requisite files to theFileSystem
: + + $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat + $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip + $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar + $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar + $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz + $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz + + 2. Setup the application'sJobConf
: + + JobConf job = new JobConf(); + DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), + job); + DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job); + DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); + DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job); + DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job); + DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job); + + 3. Use the cached files in the {@link org.apache.hadoop.mapred.Mapper} + or {@link org.apache.hadoop.mapred.Reducer}: + + public static class MapClass extends MapReduceBase + implements Mapper<K, V, K, V> { + + private Path[] localArchives; + private Path[] localFiles; + + public void configure(JobConf job) { + // Get the cached archives/files + File f = new File("./map.zip/some/file/in/zip.txt"); + } + + public void map(K key, V value, + OutputCollector<K, V> output, Reporter reporter) + throws IOException { + // Use data from the cached archives/files here + // ... + // ... + output.collect(k, v); + } + } + +
JobTracker
.]]>
+ ClusterStatus
provides clients with information such as:
+ JobTracker
.
+ Clients can query for the latest ClusterStatus
, via
+ {@link JobClient#getClusterStatus()}.
Counters
represent global counters, defined either by the
+ Map-Reduce framework or applications. Each Counter
can be of
+ any {@link Enum} type.
+
+ Counters
are bunched into {@link Group}s, each comprising of
+ counters from a particular Enum
class.]]>
+
Group
handles localization of the class name and the
+ counter names.
false
to ensure that individual input files are never split-up
+ so that {@link Mapper}s process entire files.
+
+ @param fs the file system that the file is on
+ @param filename the file name to check
+ @return is this file splitable?]]>
+ FileInputFormat
is the base class for all file-based
+ InputFormat
s. This provides a generic implementation of
+ {@link #getSplits(JobConf, int)}.
+ Subclasses of FileInputFormat
can also override the
+ {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
+ not split-up and are processed as a whole by {@link Mapper}s.]]>
+ false
otherwise]]>
+ Note: The following is valid only if the {@link OutputCommitter}
+ is {@link FileOutputCommitter}. If OutputCommitter
is not
+ a FileOutputCommitter
, the task's temporary output
+ directory is same as {@link #getOutputPath(JobConf)} i.e.
+ ${mapreduce.output.fileoutputformat.outputdir}$
Some applications need to create/write-to side-files, which differ from + the actual job-outputs. + +
In such cases there could be issues with 2 instances of the same TIP + (running simultaneously e.g. speculative tasks) trying to open/write-to the + same file (path) on HDFS. Hence the application-writer will have to pick + unique names per task-attempt (e.g. using the attemptid, say + attempt_200709221812_0001_m_000000_0), not just per TIP.
+ +To get around this the Map-Reduce framework helps the application-writer + out by maintaining a special + ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} + sub-directory for each task-attempt on HDFS where the output of the + task-attempt goes. On successful completion of the task-attempt the files + in the ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} (only) + are promoted to ${mapreduce.output.fileoutputformat.outputdir}. Of course, the + framework discards the sub-directory of unsuccessful task-attempts. This + is completely transparent to the application.
+ +The application-writer can take advantage of this by creating any + side-files required in ${mapreduce.task.output.dir} during execution + of his reduce-task i.e. via {@link #getWorkOutputPath(JobConf)}, and the + framework will move them out similarly - thus she doesn't have to pick + unique paths per task-attempt.
+ +Note: the value of ${mapreduce.task.output.dir} during + execution of a particular task-attempt is actually + ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid}, and this value is + set by the map-reduce framework. So, just create any side-files in the + path returned by {@link #getWorkOutputPath(JobConf)} from map/reduce + task to take advantage of this feature.
+ +The entire discussion holds true for maps of jobs with + reducer=NONE (i.e. 0 reduces) since output of the map, in that case, + goes directly to HDFS.
+ + @return the {@link Path} to the task's temporary output directory + for the map-reduce job.]]> +The given name is postfixed with the task type, 'm' for maps, 'r' for + reduces and the task partition number. For example, give a name 'test' + running on the first map o the job the generated name will be + 'test-m-00000'.
+ + @param conf the configuration for the job. + @param name the name to make unique. + @return a unique name accross all tasks of the job.]]> +This method uses the {@link #getUniqueName} method to make the file name + unique for the task.
+ + @param conf the configuration for the job. + @param name the name for the file. + @return a unique path accross all tasks of the job.]]> +Note: The split is a logical split of the inputs and the + input files are not physically split into chunks. For e.g. a split could + be <input-file-path, start, offset> tuple. + + @param job job configuration. + @param numSplits the desired number of splits, a hint. + @return an array of {@link InputSplit}s for the job.]]> +
RecordReader
to respect
+ record boundaries while processing the logical split to present a
+ record-oriented view to the individual task.
+
+ @param split the {@link InputSplit}
+ @param job the job that this split belongs to
+ @return a {@link RecordReader}]]>
+ The Map-Reduce framework relies on the InputFormat
of the
+ job to:
+
InputSplit
for processing by
+ the {@link Mapper}.
+ The default behavior of file-based {@link InputFormat}s, typically + sub-classes of {@link FileInputFormat}, is to split the + input into logical {@link InputSplit}s based on the total size, in + bytes, of the input files. However, the {@link FileSystem} blocksize of + the input files is treated as an upper bound for input splits. A lower bound + on the split size can be set via + + mapreduce.input.fileinputformat.split.minsize.
+ +Clearly, logical splits based on input-size is insufficient for many
+ applications since record boundaries are to be respected. In such cases, the
+ application has to also implement a {@link RecordReader} on whom lies the
+ responsibilty to respect record-boundaries and present a record-oriented
+ view of the logical InputSplit
to the individual task.
+
+ @see InputSplit
+ @see RecordReader
+ @see JobClient
+ @see FileInputFormat]]>
+
String
s.
+ @throws IOException]]>
+ Typically, it presents a byte-oriented view on the input and is the + responsibility of {@link RecordReader} of the job to process this and present + a record-oriented view. + + @see InputFormat + @see RecordReader]]> +
JobClient
provides facilities to submit jobs, track their
+ progress, access component-tasks' reports/logs, get the Map-Reduce cluster
+ status information etc.
+
+ The job submission process involves: +
JobClient
to submit
+ the job and monitor its progress.
+
+ Here is an example on how to use JobClient
:
+ ++ // Create a new JobConf + JobConf job = new JobConf(new Configuration(), MyJob.class); + + // Specify various job-specific parameters + job.setJobName("myjob"); + + job.setInputPath(new Path("in")); + job.setOutputPath(new Path("out")); + + job.setMapperClass(MyJob.MyMapper.class); + job.setReducerClass(MyJob.MyReducer.class); + + // Submit the job, then poll for progress until the job is complete + JobClient.runJob(job); +
At times clients would chain map-reduce jobs to accomplish complex tasks + which cannot be done via a single map-reduce job. This is fairly easy since + the output of the job, typically, goes to distributed file-system and that + can be used as the input for the next job.
+ +However, this also means that the onus on ensuring jobs are complete + (success/failure) lies squarely on the clients. In such situations the + various job-control options are: +
false
otherwise.]]>
+ false
otherwise.]]>
+ This comparator should be provided if the equivalence rules for keys + for sorting the intermediates are different from those for grouping keys + before each call to + {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.
+ +For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed + in a single call to the reduce function if K1 and K2 compare as equal.
+ +Since {@link #setOutputKeyComparatorClass(Class)} can be used to control + how keys are sorted, this can be used in conjunction to simulate + secondary sort on values.
+ +Note: This is not a guarantee of the combiner sort being + stable in any sense. (In any case, with the order of available + map-outputs to the combiner being non-deterministic, it wouldn't make + that much sense.)
+ + @param theClass the comparator class to be used for grouping keys for the + combiner. It should implementRawComparator
.
+ @see #setOutputKeyComparatorClass(Class)]]>
+ For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed + in a single call to the reduce function if K1 and K2 compare as equal.
+ +Since {@link #setOutputKeyComparatorClass(Class)} can be used to control + how keys are sorted, this can be used in conjunction to simulate + secondary sort on values.
+ +Note: This is not a guarantee of the reduce sort being + stable in any sense. (In any case, with the order of available + map-outputs to the reduce being non-deterministic, it wouldn't make + that much sense.)
+ + @param theClass the comparator class to be used for grouping keys. + It should implementRawComparator
.
+ @see #setOutputKeyComparatorClass(Class)
+ @see #setCombinerKeyGroupingComparator(Class)]]>
+ The combiner is an application-specified aggregation operation, which + can help cut down the amount of data transferred between the + {@link Mapper} and the {@link Reducer}, leading to better performance.
+ +The framework may invoke the combiner 0, 1, or multiple times, in both + the mapper and reducer tasks. In general, the combiner is called as the + sort/merge result is written to disk. The combiner must: +
Typically the combiner is same as the Reducer
for the
+ job i.e. {@link #setReducerClass(Class)}.
true
if speculative execution be used for this job,
+ false
otherwise.]]>
+ false
.]]>
+ true
if speculative execution be
+ used for this job for map tasks,
+ false
otherwise.]]>
+ false
.]]>
+ true
if speculative execution be used
+ for reduce tasks for this job,
+ false
otherwise.]]>
+ false
.]]>
+ The number of maps is usually driven by the total size of the inputs + i.e. total number of blocks of the input files.
+ +The right level of parallelism for maps seems to be around 10-100 maps + per-node, although it has been set up to 300 or so for very cpu-light map + tasks. Task setup takes awhile, so it is best if the maps take at least a + minute to execute.
+ +The default behavior of file-based {@link InputFormat}s is to split the + input into logical {@link InputSplit}s based on the total size, in + bytes, of input files. However, the {@link FileSystem} blocksize of the + input files is treated as an upper bound for input splits. A lower bound + on the split size can be set via + + mapreduce.input.fileinputformat.split.minsize.
+ +Thus, if you expect 10TB of input data and have a blocksize of 128MB, + you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is + used to set it even higher.
+ + @param n the number of map tasks for this job. + @see InputFormat#getSplits(JobConf, int) + @see FileInputFormat + @see FileSystem#getDefaultBlockSize() + @see FileStatus#getBlockSize()]]> +The right number of reduces seems to be 0.95
or
+ 1.75
multiplied by (<no. of nodes> *
+
+ mapreduce.tasktracker.reduce.tasks.maximum).
+
With 0.95
all of the reduces can launch immediately and
+ start transfering map outputs as the maps finish. With 1.75
+ the faster nodes will finish their first round of reduces and launch a
+ second wave of reduces doing a much better job of load balancing.
Increasing the number of reduces increases the framework overhead, but + increases load balancing and lowers the cost of failures.
+ +The scaling factors above are slightly less than whole numbers to + reserve a few reduce slots in the framework for speculative-tasks, failures + etc.
+ +It is legal to set the number of reduce-tasks to zero
.
In this case the output of the map-tasks directly go to distributed + file-system, to the path set by + {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the + framework doesn't sort the map-outputs before writing it out to HDFS.
+ + @param n the number of reduce tasks for this job.]]> +zero
, i.e. any failed map-task results in
+ the job being declared as {@link JobStatus#FAILED}.
+
+ @return the maximum percentage of map tasks that can fail without
+ the job being aborted.]]>
+ zero
, i.e. any failed reduce-task results
+ in the job being declared as {@link JobStatus#FAILED}.
+
+ @return the maximum percentage of reduce tasks that can fail without
+ the job being aborted.]]>
+ The debug command, run on the node where the map failed, is:
++ ++ $script $stdout $stderr $syslog $jobconf. +
The script file is distributed through {@link DistributedCache} + APIs. The script needs to be symlinked.
+ +Here is an example on how to submit a script +
+ + @param mDbgScript the script name]]> ++ job.setMapDebugScript("./myscript"); + DistributedCache.createSymlink(job); + DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); +
The debug command, run on the node where the map failed, is:
++ ++ $script $stdout $stderr $syslog $jobconf. +
The script file is distributed through {@link DistributedCache} + APIs. The script file needs to be symlinked
+ +Here is an example on how to submit a script +
+ + @param rDbgScript the script name]]> ++ job.setReduceDebugScript("./myscript"); + DistributedCache.createSymlink(job); + DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); +
This is typically used by application-writers to implement chaining of + Map-Reduce jobs in an asynchronous manner.
+ + @param uri the job end notification uri + @see JobStatus + @see Job Completion and Chaining]]> +
+ ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/
.
+ This directory is exposed to the users through
+ mapreduce.job.local.dir
.
+ So, the tasks can use this space
+ as scratch space and share files among them.
+ This value is available as System property also.
+
+ @return The localized job specific shared directory]]>
+ $key
on
+ Linux or %key%
on Windows.
+
+ Example:
+ $key
on
+ Linux or %key%
on Windows.
+
+ Example:
+ $key
on
+ Linux or %key%
on Windows.
+
+ Example:
+ JobConf
is the primary interface for a user to describe a
+ map-reduce job to the Hadoop framework for execution. The framework tries to
+ faithfully execute the job as-is described by JobConf
, however:
+ JobConf
typically specifies the {@link Mapper}, combiner
+ (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and
+ {@link OutputFormat} implementations to be used etc.
+
+
Optionally JobConf
is used to specify other advanced facets
+ of the job such as Comparator
s to be used, files to be put in
+ the {@link DistributedCache}, whether or not intermediate and/or job outputs
+ are to be compressed (and how), debugability via user-provided scripts
+ ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
+ for doing post-processing on task logs, task's stdout, stderr, syslog.
+ and etc.
Here is an example on how to configure a job via JobConf
:
+ + @see JobClient + @see ClusterStatus + @see Tool + @see DistributedCache]]> ++ // Create a new JobConf + JobConf job = new JobConf(new Configuration(), MyJob.class); + + // Specify various job-specific parameters + job.setJobName("myjob"); + + FileInputFormat.setInputPaths(job, new Path("in")); + FileOutputFormat.setOutputPath(job, new Path("out")); + + job.setMapperClass(MyJob.MyMapper.class); + job.setCombinerClass(MyJob.MyReducer.class); + job.setReducerClass(MyJob.MyReducer.class); + + job.setInputFormat(SequenceFileInputFormat.class); + job.setOutputFormat(SequenceFileOutputFormat.class); +
+ JobID.getTaskIDsPattern("200707121733", null); ++ which will return : +
"job_200707121733_[0-9]*"+ @param jtIdentifier jobTracker identifier, or null + @param jobId job number, or null + @return a regex pattern matching JobIDs]]> +
job_200707121733_0003
, which represents the third job
+ running at the jobtracker started at 200707121733
.
+ + Applications should never construct or parse JobID strings, but rather + use appropriate constructors or {@link #forName(String)} method. + + @see TaskID + @see TaskAttemptID]]> +
Applications can use the {@link Reporter} provided to report progress + or just indicate that they are alive. In scenarios where the application + takes significant amount of time to process individual key/value + pairs, this is crucial since the framework might assume that the task has + timed-out and kill that task. The other way of avoiding this is to set + + mapreduce.task.timeout to a high-enough value (or even zero for no + time-outs).
+ + @param key the input key. + @param value the input value. + @param output collects mapped keys and values. + @param reporter facility to report progress.]]> +The Hadoop Map-Reduce framework spawns one map task for each
+ {@link InputSplit} generated by the {@link InputFormat} for the job.
+ Mapper
implementations can access the {@link JobConf} for the
+ job via the {@link JobConfigurable#configure(JobConf)} and initialize
+ themselves. Similarly they can use the {@link Closeable#close()} method for
+ de-initialization.
The framework then calls
+ {@link #map(Object, Object, OutputCollector, Reporter)}
+ for each key/value pair in the InputSplit
for that task.
All intermediate values associated with a given output key are
+ subsequently grouped by the framework, and passed to a {@link Reducer} to
+ determine the final output. Users can control the grouping by specifying
+ a Comparator
via
+ {@link JobConf#setOutputKeyComparatorClass(Class)}.
The grouped Mapper
outputs are partitioned per
+ Reducer
. Users can control which keys (and hence records) go to
+ which Reducer
by implementing a custom {@link Partitioner}.
+
+
Users can optionally specify a combiner
, via
+ {@link JobConf#setCombinerClass(Class)}, to perform local aggregation of the
+ intermediate outputs, which helps to cut down the amount of data transferred
+ from the Mapper
to the Reducer
.
+
+
The intermediate, grouped outputs are always stored in
+ {@link SequenceFile}s. Applications can specify if and how the intermediate
+ outputs are to be compressed and which {@link CompressionCodec}s are to be
+ used via the JobConf
.
If the job has
+ zero
+ reduces then the output of the Mapper
is directly written
+ to the {@link FileSystem} without grouping by keys.
Example:
++ ++ public class MyMapper<K extends WritableComparable, V extends Writable> + extends MapReduceBase implements Mapper<K, V, K, V> { + + static enum MyCounters { NUM_RECORDS } + + private String mapTaskId; + private String inputFile; + private int noRecords = 0; + + public void configure(JobConf job) { + mapTaskId = job.get(JobContext.TASK_ATTEMPT_ID); + inputFile = job.get(JobContext.MAP_INPUT_FILE); + } + + public void map(K key, V val, + OutputCollector<K, V> output, Reporter reporter) + throws IOException { + // Process the <key, value> pair (assume this takes a while) + // ... + // ... + + // Let the framework know that we are alive, and kicking! + // reporter.progress(); + + // Process some more + // ... + // ... + + // Increment the no. of <key, value> pairs processed + ++noRecords; + + // Increment counters + reporter.incrCounter(NUM_RECORDS, 1); + + // Every 100 records update application-level status + if ((noRecords%100) == 0) { + reporter.setStatus(mapTaskId + " processed " + noRecords + + " from input-file: " + inputFile); + } + + // Output the result + output.collect(key, val); + } + } +
Applications may write a custom {@link MapRunnable} to exert greater
+ control on map processing e.g. multi-threaded Mapper
s etc.
Mapping of input records to output records is complete when this method + returns.
+ + @param input the {@link RecordReader} to read the input records. + @param output the {@link OutputCollector} to collect the outputrecords. + @param reporter {@link Reporter} to report progress, status-updates etc. + @throws IOException]]> +MapRunnable
can exert greater
+ control on map processing e.g. multi-threaded, asynchronous mappers etc.
+
+ @see Mapper]]>
+ RecordReader
's for MultiFileSplit
's.
+ @see MultiFileSplit]]>
+ OutputCollector
is the generalization of the facility
+ provided by the Map-Reduce framework to collect data output by either the
+ Mapper
or the Reducer
i.e. intermediate outputs
+ or the output of the job.
false
otherwise
+ @throws IOException
+ @see #recoverTask(TaskAttemptContext)]]>
+ The Map-Reduce framework relies on the OutputCommitter
of
+ the job to:
+
The Map-Reduce framework relies on the OutputFormat
of the
+ job to:
+
key
.]]>
+ Partitioner
controls the partitioning of the keys of the
+ intermediate map-outputs. The key (or a subset of the key) is used to derive
+ the partition, typically by a hash function. The total number of partitions
+ is the same as the number of reduce tasks for the job. Hence this controls
+ which of the m
reduce tasks the intermediate key (and hence the
+ record) is sent for reduction.
+
+ @see Reducer]]>
+ 1.0
.
+ @throws IOException]]>
+ RecordReader
, typically, converts the byte-oriented view of
+ the input, provided by the InputSplit
, and presents a
+ record-oriented view for the {@link Mapper} & {@link Reducer} tasks for
+ processing. It thus assumes the responsibility of processing record
+ boundaries and presenting the tasks with keys and values.
RecordWriter
implementations write the job outputs to the
+ {@link FileSystem}.
+
+ @see OutputFormat]]>
+
The framework calls this method for each
+ <key, (list of values)>
pair in the grouped inputs.
+ Output values must be of the same type as input values. Input keys must
+ not be altered. The framework will reuse the key and value objects
+ that are passed into the reduce, therefore the application should clone
+ the objects they want to keep a copy of. In many cases, all values are
+ combined into zero or one value.
+
Output pairs are collected with calls to + {@link OutputCollector#collect(Object,Object)}.
+ +Applications can use the {@link Reporter} provided to report progress + or just indicate that they are alive. In scenarios where the application + takes a significant amount of time to process individual key/value + pairs, this is crucial since the framework might assume that the task has + timed-out and kill that task. The other way of avoiding this is to set + + mapreduce.task.timeout to a high-enough value (or even zero for no + time-outs).
+ + @param key the key. + @param values the list of values to reduce. + @param output to collect keys and combined values. + @param reporter facility to report progress.]]> +Reducer
s for the job is set by the user via
+ {@link JobConf#setNumReduceTasks(int)}. Reducer
implementations
+ can access the {@link JobConf} for the job via the
+ {@link JobConfigurable#configure(JobConf)} method and initialize themselves.
+ Similarly they can use the {@link Closeable#close()} method for
+ de-initialization.
+
+ Reducer
has 3 primary phases:
Reducer
is input the grouped output of a {@link Mapper}.
+ In the phase the framework, for each Reducer
, fetches the
+ relevant partition of the output of all the Mapper
s, via HTTP.
+
The framework groups Reducer
inputs by key
s
+ (since different Mapper
s may have output the same key) in this
+ stage.
The shuffle and sort phases occur simultaneously i.e. while outputs are + being fetched they are merged.
+ +If equivalence rules for keys while grouping the intermediates are
+ different from those for grouping keys before reduction, then one may
+ specify a Comparator
via
+ {@link JobConf#setOutputValueGroupingComparator(Class)}.Since
+ {@link JobConf#setOutputKeyComparatorClass(Class)} can be used to
+ control how intermediate keys are grouped, these can be used in conjunction
+ to simulate secondary sort on values.
In this phase the
+ {@link #reduce(Object, Iterator, OutputCollector, Reporter)}
+ method is called for each <key, (list of values)>
pair in
+ the grouped inputs.
The output of the reduce task is typically written to the + {@link FileSystem} via + {@link OutputCollector#collect(Object, Object)}.
+The output of the Reducer
is not re-sorted.
Example:
++ + @see Mapper + @see Partitioner + @see Reporter + @see MapReduceBase]]> ++ public class MyReducer<K extends WritableComparable, V extends Writable> + extends MapReduceBase implements Reducer<K, V, K, V> { + + static enum MyCounters { NUM_RECORDS } + + private String reduceTaskId; + private int noKeys = 0; + + public void configure(JobConf job) { + reduceTaskId = job.get(JobContext.TASK_ATTEMPT_ID); + } + + public void reduce(K key, Iterator<V> values, + OutputCollector<K, V> output, + Reporter reporter) + throws IOException { + + // Process + int noValues = 0; + while (values.hasNext()) { + V value = values.next(); + + // Increment the no. of values for this key + ++noValues; + + // Process the <key, value> pair (assume this takes a while) + // ... + // ... + + // Let the framework know that we are alive, and kicking! + if ((noValues%10) == 0) { + reporter.progress(); + } + + // Process some more + // ... + // ... + + // Output the <key, value> + output.collect(key, value); + } + + // Increment the no. of <key, list of values> pairs processed + ++noKeys; + + // Increment counters + reporter.incrCounter(NUM_RECORDS, 1); + + // Every 100 keys update application-level status + if ((noKeys%100) == 0) { + reporter.setStatus(reduceTaskId + " processed " + noKeys); + } + } + } +
Reporter
+ provided to report progress or just indicate that they are alive. In
+ scenarios where the application takes significant amount of time to
+ process individual key/value pairs, this is crucial since the framework
+ might assume that the task has timed-out and kill that task.
+
+ Applications can also update {@link Counters} via the provided
+ Reporter
.
false
.
+ @throws IOException]]>
+ false
.
+ @throws IOException]]>
+ false
.
+ @throws IOException]]>
+ Clients can get hold of RunningJob
via the {@link JobClient}
+ and then query the running-job for details such as name, configuration,
+ progress etc.
false
otherwise.]]>
+ false
otherwise.]]>
+ This feature can be used when map/reduce tasks crashes deterministically on + certain input. This happens due to bugs in the map/reduce function. The usual + course would be to fix these bugs. But sometimes this is not possible; + perhaps the bug is in third party libraries for which the source code is + not available. Due to this, the task never reaches to completion even with + multiple attempts and complete data for that task is lost.
+ +With this feature, only a small portion of data is lost surrounding + the bad record, which may be acceptable for some user applications. + see {@link SkipBadRecords#setMapperMaxSkipRecords(Configuration, long)}
+ +The skipping mode gets kicked off after certain no of failures + see {@link SkipBadRecords#setAttemptsToStartSkipping(Configuration, int)}
+ +In the skipping mode, the map/reduce task maintains the record range which + is getting processed at all times. Before giving the input to the + map/reduce function, it sends this record range to the Task tracker. + If task crashes, the Task tracker knows which one was the last reported + range. On further attempts that range get skipped.
]]> ++ TaskAttemptID.getTaskAttemptIDsPattern(null, null, true, 1, null); ++ which will return : +
"attempt_[^_]*_[0-9]*_m_000001_[0-9]*"+ @param jtIdentifier jobTracker identifier, or null + @param jobId job number, or null + @param isMap whether the tip is a map, or null + @param taskId taskId number, or null + @param attemptId the task attempt number, or null + @return a regex pattern matching TaskAttemptIDs]]> +
+ TaskAttemptID.getTaskAttemptIDsPattern(null, null, TaskType.MAP, 1, null); ++ which will return : +
"attempt_[^_]*_[0-9]*_m_000001_[0-9]*"+ @param jtIdentifier jobTracker identifier, or null + @param jobId job number, or null + @param type the {@link TaskType} + @param taskId taskId number, or null + @param attemptId the task attempt number, or null + @return a regex pattern matching TaskAttemptIDs]]> +
attempt_200707121733_0003_m_000005_0
, which represents the
+ zeroth task attempt for the fifth map task in the third job
+ running at the jobtracker started at 200707121733
.
+ + Applications should never construct or parse TaskAttemptID strings + , but rather use appropriate constructors or {@link #forName(String)} + method. + + @see JobID + @see TaskID]]> +
+ TaskID.getTaskIDsPattern(null, null, true, 1); ++ which will return : +
"task_[^_]*_[0-9]*_m_000001*"+ @param jtIdentifier jobTracker identifier, or null + @param jobId job number, or null + @param isMap whether the tip is a map, or null + @param taskId taskId number, or null + @return a regex pattern matching TaskIDs + @deprecated Use {@link TaskID#getTaskIDsPattern(String, Integer, TaskType, + Integer)}]]> +
+ TaskID.getTaskIDsPattern(null, null, true, 1); ++ which will return : +
"task_[^_]*_[0-9]*_m_000001*"+ @param jtIdentifier jobTracker identifier, or null + @param jobId job number, or null + @param type the {@link TaskType}, or null + @param taskId taskId number, or null + @return a regex pattern matching TaskIDs]]> +
task_200707121733_0003_m_000005
, which represents the
+ fifth map task in the third job running at the jobtracker
+ started at 200707121733
.
+ + Applications should never construct or parse TaskID strings + , but rather use appropriate constructors or {@link #forName(String)} + method. + + @see JobID + @see TaskAttemptID]]> +
) }]]> +
mapperConf
, have precedence over the job's JobConf. This
+ precedence is in effect when the task is running.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the
+ ChainMapper, this is done by the addMapper for the last mapper in the chain
+
+
+ @param job job's JobConf to add the Mapper class.
+ @param klass the Mapper class to add.
+ @param inputKeyClass mapper input key class.
+ @param inputValueClass mapper input value class.
+ @param outputKeyClass mapper output key class.
+ @param outputValueClass mapper output value class.
+ @param byValue indicates if key/values should be passed by value
+ to the next Mapper in the chain, if any.
+ @param mapperConf a JobConf with the configuration for the Mapper
+ class. It is recommended to use a JobConf without default values using the
+ JobConf(boolean loadDefaults)
constructor with FALSE.]]>
+ super.configure(...)
should be
+ invoked at the beginning of the overwriter method.]]>
+ super.close()
should be
+ invoked at the end of the overwriter method.]]>
+ [MAP+ / REDUCE MAP*]
. And
+ immediate benefit of this pattern is a dramatic reduction in disk IO.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the
+ ChainMapper, this is done by the addMapper for the last mapper in the chain.
+
+ ChainMapper usage pattern:
+
+ + ... + conf.setJobName("chain"); + conf.setInputFormat(TextInputFormat.class); + conf.setOutputFormat(TextOutputFormat.class); + + JobConf mapAConf = new JobConf(false); + ... + ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class, + Text.class, Text.class, true, mapAConf); + + JobConf mapBConf = new JobConf(false); + ... + ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, mapBConf); + + JobConf reduceConf = new JobConf(false); + ... + ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class, + Text.class, Text.class, true, reduceConf); + + ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, null); + + ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class, + LongWritable.class, LongWritable.class, true, null); + + FileInputFormat.setInputPaths(conf, inDir); + FileOutputFormat.setOutputPath(conf, outDir); + ... + + JobClient jc = new JobClient(conf); + RunningJob job = jc.submitJob(conf); + ... +]]> +
reducerConf
, have precedence over the job's JobConf. This
+ precedence is in effect when the task is running.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the
+ ChainReducer, this is done by the setReducer or the addMapper for the last
+ element in the chain.
+
+ @param job job's JobConf to add the Reducer class.
+ @param klass the Reducer class to add.
+ @param inputKeyClass reducer input key class.
+ @param inputValueClass reducer input value class.
+ @param outputKeyClass reducer output key class.
+ @param outputValueClass reducer output value class.
+ @param byValue indicates if key/values should be passed by value
+ to the next Mapper in the chain, if any.
+ @param reducerConf a JobConf with the configuration for the Reducer
+ class. It is recommended to use a JobConf without default values using the
+ JobConf(boolean loadDefaults)
constructor with FALSE.]]>
+ mapperConf
, have precedence over the job's JobConf. This
+ precedence is in effect when the task is running.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the
+ ChainMapper, this is done by the addMapper for the last mapper in the chain
+ .
+
+ @param job chain job's JobConf to add the Mapper class.
+ @param klass the Mapper class to add.
+ @param inputKeyClass mapper input key class.
+ @param inputValueClass mapper input value class.
+ @param outputKeyClass mapper output key class.
+ @param outputValueClass mapper output value class.
+ @param byValue indicates if key/values should be passed by value
+ to the next Mapper in the chain, if any.
+ @param mapperConf a JobConf with the configuration for the Mapper
+ class. It is recommended to use a JobConf without default values using the
+ JobConf(boolean loadDefaults)
constructor with FALSE.]]>
+ super.configure(...)
should be
+ invoked at the beginning of the overwriter method.]]>
+ map(...)
methods of the Mappers in the chain.]]>
+ super.close()
should be
+ invoked at the end of the overwriter method.]]>
+ [MAP+ / REDUCE MAP*]
. And
+ immediate benefit of this pattern is a dramatic reduction in disk IO.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the
+ ChainReducer, this is done by the setReducer or the addMapper for the last
+ element in the chain.
+
+ ChainReducer usage pattern:
+
+ + ... + conf.setJobName("chain"); + conf.setInputFormat(TextInputFormat.class); + conf.setOutputFormat(TextOutputFormat.class); + + JobConf mapAConf = new JobConf(false); + ... + ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class, + Text.class, Text.class, true, mapAConf); + + JobConf mapBConf = new JobConf(false); + ... + ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, mapBConf); + + JobConf reduceConf = new JobConf(false); + ... + ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class, + Text.class, Text.class, true, reduceConf); + + ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, null); + + ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class, + LongWritable.class, LongWritable.class, true, null); + + FileInputFormat.setInputPaths(conf, inDir); + FileOutputFormat.setOutputPath(conf, outDir); + ... + + JobClient jc = new JobClient(conf); + RunningJob job = jc.submitJob(conf); + ... +]]> +
CombineFileSplit
's.
+ @see CombineFileSplit]]>
+ SequenceFileInputFormat
.
+
+ @see CombineFileInputFormat]]>
+ TextInputFormat
.
+
+ @see CombineFileInputFormat]]>
+ false
+ if it is single. If the name output is not defined it returns
+ false
]]>
+ super.close()
at the
+ end of their close()
+
+ @throws java.io.IOException thrown if any of the MultipleOutput files
+ could not be closed properly.]]>
+ map()
and reduce()
methods of the
+ Mapper
and Reducer
implementations.
+
+ Each additional output, or named output, may be configured with its own
+ OutputFormat
, with its own key class and with its own value
+ class.
+
+ A named output can be a single file or a multi file. The later is refered as
+ a multi named output.
+
+ A multi named output is an unbound set of files all sharing the same
+ OutputFormat
, key class and value class configuration.
+
+ When named outputs are used within a Mapper
implementation,
+ key/values written to a name output are not part of the reduce phase, only
+ key/values written to the job OutputCollector
are part of the
+ reduce phase.
+
+ MultipleOutputs supports counters, by default the are disabled. The counters
+ group is the {@link MultipleOutputs} class name.
+
+ The names of the counters are the same as the named outputs. For multi
+ named outputs the name of the counter is the concatenation of the named
+ output, and underscore '_' and the multiname.
+
+ Job configuration usage pattern is:
+ + + JobConf conf = new JobConf(); + + conf.setInputPath(inDir); + FileOutputFormat.setOutputPath(conf, outDir); + + conf.setMapperClass(MOMap.class); + conf.setReducerClass(MOReduce.class); + ... + + // Defines additional single text based output 'text' for the job + MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class, + LongWritable.class, Text.class); + + // Defines additional multi sequencefile based output 'sequence' for the + // job + MultipleOutputs.addMultiNamedOutput(conf, "seq", + SequenceFileOutputFormat.class, + LongWritable.class, Text.class); + ... + + JobClient jc = new JobClient(); + RunningJob job = jc.submitJob(conf); + + ... ++ + Job configuration usage pattern is: +
+ + public class MOReduce implements + Reducer<WritableComparable, Writable> { + private MultipleOutputs mos; + + public void configure(JobConf conf) { + ... + mos = new MultipleOutputs(conf); + } + + public void reduce(WritableComparable key, Iterator<Writable> values, + OutputCollector output, Reporter reporter) + throws IOException { + ... + mos.getCollector("text", reporter).collect(key, new Text("Hello")); + mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye")); + mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau")); + ... + } + + public void close() throws IOException { + mos.close(); + ... + } + + } +]]> +
+ Map implementations using this MapRunnable must be thread-safe. +
+ The Map-Reduce job has to be configured to use this MapRunnable class (using
+ the JobConf.setMapRunnerClass method) and
+ the number of thread the thread-pool can use with the
+ mapred.map.multithreadedrunner.threads
property, its default
+ value is 10 threads.
+
]]> +
ClusterMetrics
provides clients with information such as:
+ Clients can query for the latest ClusterMetrics
, via
+ {@link Cluster#getClusterStatus()}.
Counters
represent global counters, defined either by the
+ Map-Reduce framework or applications. Each Counter
is named by
+ an {@link Enum} and has a long for the value.
+
+ Counters
are bunched into Groups, each comprising of
+ counters from a particular Enum
class.]]>
+
Counters
holds per job/task counters, defined either by the
+ Map-Reduce framework or applications. Each Counter
can be of
+ any {@link Enum} type.
+
+ Counters
are bunched into {@link CounterGroup}s, each
+ comprising of counters from a particular Enum
class.]]>
+
Note: The split is a logical split of the inputs and the + input files are not physically split into chunks. For e.g. a split could + be <input-file-path, start, offset> tuple. The InputFormat + also creates the {@link RecordReader} to read the {@link InputSplit}. + + @param context job configuration. + @return an array of {@link InputSplit}s for the job.]]> +
The Map-Reduce framework relies on the InputFormat
of the
+ job to:
+
InputSplit
for processing by
+ the {@link Mapper}.
+ The default behavior of file-based {@link InputFormat}s, typically + sub-classes of {@link FileInputFormat}, is to split the + input into logical {@link InputSplit}s based on the total size, in + bytes, of the input files. However, the {@link FileSystem} blocksize of + the input files is treated as an upper bound for input splits. A lower bound + on the split size can be set via + + mapreduce.input.fileinputformat.split.minsize.
+ +Clearly, logical splits based on input-size is insufficient for many
+ applications since record boundaries are to respected. In such cases, the
+ application has to also implement a {@link RecordReader} on whom lies the
+ responsibility to respect record-boundaries and present a record-oriented
+ view of the logical InputSplit
to the individual task.
+
+ @see InputSplit
+ @see RecordReader
+ @see FileInputFormat]]>
+
Typically, it presents a byte-oriented view on the input and is the + responsibility of {@link RecordReader} of the job to process this and present + a record-oriented view. + + @see InputFormat + @see RecordReader]]> +
Configuration
so
+ that any necessary internal modifications do not reflect on the incoming
+ parameter.
+
+ A Cluster will be created from the conf parameter only when it's needed.
+
+ @param conf the configuration
+ @return the {@link Job} , with no connection to a cluster yet.
+ @throws IOException]]>
+ Configuration
so
+ that any necessary internal modifications do not reflect on the incoming
+ parameter.
+
+ @param conf the configuration
+ @return the {@link Job} , with no connection to a cluster yet.
+ @throws IOException]]>
+ Configuration
so
+ that any necessary internal modifications do not reflect on the incoming
+ parameter.
+
+ @param status job status
+ @param conf job configuration
+ @return the {@link Job} , with no connection to a cluster yet.
+ @throws IOException]]>
+ Configuration
so
+ that any necessary internal modifications do not reflect on the incoming
+ parameter.
+
+ @param ignored
+ @return the {@link Job} , with no connection to a cluster yet.
+ @throws IOException
+ @deprecated Use {@link #getInstance()}]]>
+ Configuration
so
+ that any necessary internal modifications do not reflect on the incoming
+ parameter.
+
+ @param ignored
+ @param conf job configuration
+ @return the {@link Job} , with no connection to a cluster yet.
+ @throws IOException
+ @deprecated Use {@link #getInstance(Configuration)}]]>
+ Configuration
so
+ that any necessary internal modifications do not reflect on the incoming
+ parameter.
+
+ @param cluster cluster
+ @param status job status
+ @param conf job configuration
+ @return the {@link Job} , with no connection to a cluster yet.
+ @throws IOException]]>
+ false
.
+ @throws IOException]]>
+ false
.
+ @throws IOException]]>
+ false
.]]>
+ false
.]]>
+ false
.]]>
+ + Normally the user creates the application, describes various facets of the + job via {@link Job} and then submits the job and monitor its progress.
+ +Here is an example on how to submit a job:
+]]> ++ // Create a new Job + Job job = Job.getInstance(); + job.setJarByClass(MyJob.class); + + // Specify various job-specific parameters + job.setJobName("myjob"); + + job.setInputPath(new Path("in")); + job.setOutputPath(new Path("out")); + + job.setMapperClass(MyJob.MyMapper.class); + job.setReducerClass(MyJob.MyReducer.class); + + // Submit the job, then poll for progress until the job is complete + job.waitForCompletion(true); +
job_200707121733_0003
, which represents the third job
+ running at the jobtracker started at 200707121733
.
+ + Applications should never construct or parse JobID strings, but rather + use appropriate constructors or {@link #forName(String)} method. + + @see TaskID + @see TaskAttemptID]]> +
The Hadoop Map-Reduce framework spawns one map task for each
+ {@link InputSplit} generated by the {@link InputFormat} for the job.
+ Mapper
implementations can access the {@link Configuration} for
+ the job via the {@link JobContext#getConfiguration()}.
+
+
The framework first calls
+ {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
+ {@link #map(Object, Object, Context)}
+ for each key/value pair in the InputSplit
. Finally
+ {@link #cleanup(Context)} is called.
All intermediate values associated with a given output key are + subsequently grouped by the framework, and passed to a {@link Reducer} to + determine the final output. Users can control the sorting and grouping by + specifying two key {@link RawComparator} classes.
+ +The Mapper
outputs are partitioned per
+ Reducer
. Users can control which keys (and hence records) go to
+ which Reducer
by implementing a custom {@link Partitioner}.
+
+
Users can optionally specify a combiner
, via
+ {@link Job#setCombinerClass(Class)}, to perform local aggregation of the
+ intermediate outputs, which helps to cut down the amount of data transferred
+ from the Mapper
to the Reducer
.
+
+
Applications can specify if and how the intermediate
+ outputs are to be compressed and which {@link CompressionCodec}s are to be
+ used via the Configuration
.
If the job has zero
+ reduces then the output of the Mapper
is directly written
+ to the {@link OutputFormat} without sorting by keys.
Example:
++ ++ public class TokenCounterMapper + extends Mapper<Object, Text, Text, IntWritable>{ + + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + + public void map(Object key, Text value, Context context) throws IOException, InterruptedException { + StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + context.write(word, one); + } + } + } +
Applications may override the {@link #run(Context)} method to exert
+ greater control on map processing e.g. multi-threaded Mapper
s
+ etc.
false
otherwise
+ @see #recoverTask(TaskAttemptContext)
+ @deprecated Use {@link #isRecoverySupported(JobContext)} instead.]]>
+ false
otherwise
+ @throws IOException
+ @see #recoverTask(TaskAttemptContext)]]>
+ The Map-Reduce framework relies on the OutputCommitter
of
+ the job to:
+
The Map-Reduce framework relies on the OutputFormat
of the
+ job to:
+
key
.]]>
+ Partitioner
controls the partitioning of the keys of the
+ intermediate map-outputs. The key (or a subset of the key) is used to derive
+ the partition, typically by a hash function. The total number of partitions
+ is the same as the number of reduce tasks for the job. Hence this controls
+ which of the m
reduce tasks the intermediate key (and hence the
+ record) is sent for reduction.
+
+ Note: If you require your Partitioner class to obtain the Job's configuration
+ object, implement the {@link Configurable} interface.
+
+ @see Reducer]]>
+ RecordWriter
implementations write the job outputs to the
+ {@link FileSystem}.
+
+ @see OutputFormat]]>
+
Reducer
implementations
+ can access the {@link Configuration} for the job via the
+ {@link JobContext#getConfiguration()} method.
+
+ Reducer
has 3 primary phases:
The Reducer
copies the sorted output from each
+ {@link Mapper} using HTTP across the network.
The framework merge sorts Reducer
inputs by
+ key
s
+ (since different Mapper
s may have output the same key).
The shuffle and sort phases occur simultaneously i.e. while outputs are + being fetched they are merged.
+ +To achieve a secondary sort on the values returned by the value + iterator, the application should extend the key with the secondary + key and define a grouping comparator. The keys will be sorted using the + entire key, but will be grouped using the grouping comparator to decide + which keys and values are sent in the same call to reduce.The grouping + comparator is specified via + {@link Job#setGroupingComparatorClass(Class)}. The sort order is + controlled by + {@link Job#setSortComparatorClass(Class)}.
+ + + For example, say that you want to find duplicate web pages and tag them + all with the url of the "best" known example. You would set up the job + like: +In this phase the
+ {@link #reduce(Object, Iterable, Context)}
+ method is called for each <key, (collection of values)>
in
+ the sorted inputs.
The output of the reduce task is typically written to a + {@link RecordWriter} via + {@link Context#write(Object, Object)}.
+The output of the Reducer
is not re-sorted.
Example:
++ + @see Mapper + @see Partitioner]]> ++ public class IntSumReducer<Key> extends Reducer<Key,IntWritable, + Key,IntWritable> { + private IntWritable result = new IntWritable(); + + public void reduce(Key key, Iterable<IntWritable> values, + Context context) throws IOException, InterruptedException { + int sum = 0; + for (IntWritable val : values) { + sum += val.get(); + } + result.set(sum); + context.write(key, result); + } + } +
Counter
for the given counterName
]]>
+ counterName
.
+ @param counterName counter name
+ @return the Counter
for the given groupName
and
+ counterName
]]>
+ attempt_200707121733_0003_m_000005_0
, which represents the
+ zeroth task attempt for the fifth map task in the third job
+ running at the jobtracker started at 200707121733
.
+ + Applications should never construct or parse TaskAttemptID strings + , but rather use appropriate constructors or {@link #forName(String)} + method. + + @see JobID + @see TaskID]]> +
task_200707121733_0003_m_000005
, which represents the
+ fifth map task in the third job running at the jobtracker
+ started at 200707121733
.
+ + Applications should never construct or parse TaskID strings + , but rather use appropriate constructors or {@link #forName(String)} + method. + + @see JobID + @see TaskAttemptID]]> +
mapperConf
, have precedence over the job's Configuration. This
+ precedence is in effect when the task is running.
+
+ + IMPORTANT: There is no need to specify the output key/value classes for the + ChainMapper, this is done by the addMapper for the last mapper in the chain +
+ + @param job + The job. + @param klass + the Mapper class to add. + @param inputKeyClass + mapper input key class. + @param inputValueClass + mapper input value class. + @param outputKeyClass + mapper output key class. + @param outputValueClass + mapper output value class. + @param mapperConf + a configuration for the Mapper class. It is recommended to use a + Configuration without default values using the +Configuration(boolean loadDefaults)
constructor with
+ FALSE.]]>
+ + The key functionality of this feature is that the Mappers in the chain do not + need to be aware that they are executed in a chain. This enables having + reusable specialized Mappers that can be combined to perform composite + operations within a single task. +
++ Special care has to be taken when creating chains that the key/values output + by a Mapper are valid for the following Mapper in the chain. It is assumed + all Mappers and the Reduce in the chain use matching output and input key and + value classes as no conversion is done by the chaining code. +
+
+ Using the ChainMapper and the ChainReducer classes is possible to compose
+ Map/Reduce jobs that look like [MAP+ / REDUCE MAP*]
. And
+ immediate benefit of this pattern is a dramatic reduction in disk IO.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the + ChainMapper, this is done by the addMapper for the last mapper in the chain. +
+ ChainMapper usage pattern: + + ++ ... + Job = new Job(conf); + + Configuration mapAConf = new Configuration(false); + ... + ChainMapper.addMapper(job, AMap.class, LongWritable.class, Text.class, + Text.class, Text.class, true, mapAConf); + + Configuration mapBConf = new Configuration(false); + ... + ChainMapper.addMapper(job, BMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, mapBConf); + + ... + + job.waitForComplettion(true); + ... +]]> +
reducerConf
, have precedence over the job's Configuration.
+ This precedence is in effect when the task is running.
+
+ + IMPORTANT: There is no need to specify the output key/value classes for the + ChainReducer, this is done by the setReducer or the addMapper for the last + element in the chain. +
+ + @param job + the job + @param klass + the Reducer class to add. + @param inputKeyClass + reducer input key class. + @param inputValueClass + reducer input value class. + @param outputKeyClass + reducer output key class. + @param outputValueClass + reducer output value class. + @param reducerConf + a configuration for the Reducer class. It is recommended to use a + Configuration without default values using the +Configuration(boolean loadDefaults)
constructor with
+ FALSE.]]>
+ mapperConf
, have precedence over the job's Configuration. This
+ precedence is in effect when the task is running.
+
+ + IMPORTANT: There is no need to specify the output key/value classes for the + ChainMapper, this is done by the addMapper for the last mapper in the + chain. +
+ + @param job + The job. + @param klass + the Mapper class to add. + @param inputKeyClass + mapper input key class. + @param inputValueClass + mapper input value class. + @param outputKeyClass + mapper output key class. + @param outputValueClass + mapper output value class. + @param mapperConf + a configuration for the Mapper class. It is recommended to use a + Configuration without default values using the +Configuration(boolean loadDefaults)
constructor with
+ FALSE.]]>
+ + The key functionality of this feature is that the Mappers in the chain do not + need to be aware that they are executed after the Reducer or in a chain. This + enables having reusable specialized Mappers that can be combined to perform + composite operations within a single task. +
++ Special care has to be taken when creating chains that the key/values output + by a Mapper are valid for the following Mapper in the chain. It is assumed + all Mappers and the Reduce in the chain use matching output and input key and + value classes as no conversion is done by the chaining code. +
+ Using the ChainMapper and the ChainReducer classes is possible to + compose Map/Reduce jobs that look like[MAP+ / REDUCE MAP*]
. And
+ immediate benefit of this pattern is a dramatic reduction in disk IO.
+ + IMPORTANT: There is no need to specify the output key/value classes for the + ChainReducer, this is done by the setReducer or the addMapper for the last + element in the chain. +
+ ChainReducer usage pattern: + + ++ ... + Job = new Job(conf); + .... + + Configuration reduceConf = new Configuration(false); + ... + ChainReducer.setReducer(job, XReduce.class, LongWritable.class, Text.class, + Text.class, Text.class, true, reduceConf); + + ChainReducer.addMapper(job, CMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, null); + + ChainReducer.addMapper(job, DMap.class, LongWritable.class, Text.class, + LongWritable.class, LongWritable.class, true, null); + + ... + + job.waitForCompletion(true); + ... +]]> +
+ Implementations are responsible for writing the fields of the object + to PreparedStatement, and reading the fields of the object from the + ResultSet. + +
Example:
+ If we have the following table in the database : ++ CREATE TABLE MyTable ( + counter INTEGER NOT NULL, + timestamp BIGINT NOT NULL, + ); ++ then we can read/write the tuples from/to the table with : +
+ public class MyWritable implements Writable, DBWritable { + // Some data + private int counter; + private long timestamp; + + //Writable#write() implementation + public void write(DataOutput out) throws IOException { + out.writeInt(counter); + out.writeLong(timestamp); + } + + //Writable#readFields() implementation + public void readFields(DataInput in) throws IOException { + counter = in.readInt(); + timestamp = in.readLong(); + } + + public void write(PreparedStatement statement) throws SQLException { + statement.setInt(1, counter); + statement.setLong(2, timestamp); + } + + public void readFields(ResultSet resultSet) throws SQLException { + counter = resultSet.getInt(1); + timestamp = resultSet.getLong(2); + } + } +]]> +
CombineFileSplit
's.
+
+ @see CombineFileSplit]]>
+ SequenceFileInputFormat
.
+
+ @see CombineFileInputFormat]]>
+ TextInputFormat
.
+
+ @see CombineFileInputFormat]]>
+ false
to ensure that individual input files are never split-up
+ so that {@link Mapper}s process entire files.
+
+ @param context the job context
+ @param filename the file name to check
+ @return is this file splitable?]]>
+ FileInputFormat
is the base class for all file-based
+ InputFormat
s. This provides a generic implementation of
+ {@link #getSplits(JobContext)}.
+ Subclasses of FileInputFormat
can also override the
+ {@link #isSplitable(JobContext, Path)} method to ensure input-files are
+ not split-up and are processed as a whole by {@link Mapper}s.]]>
+ ) }]]> +
+ Mapper implementations using this MapRunnable must be thread-safe. +
+ The Map-Reduce job has to be configured with the mapper to use via + {@link #setMapperClass(Configuration, Class)} and + the number of thread the thread-pool can use with the + {@link #getNumberOfThreads(Configuration) method. The default + value is 10 threads. +
]]> +
Mapper.Context
for custom implementations]]>
+ false
otherwise]]>
+ Some applications need to create/write-to side-files, which differ from + the actual job-outputs. + +
In such cases there could be issues with 2 instances of the same TIP + (running simultaneously e.g. speculative tasks) trying to open/write-to the + same file (path) on HDFS. Hence the application-writer will have to pick + unique names per task-attempt (e.g. using the attemptid, say + attempt_200709221812_0001_m_000000_0), not just per TIP.
+ +To get around this the Map-Reduce framework helps the application-writer + out by maintaining a special + ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} + sub-directory for each task-attempt on HDFS where the output of the + task-attempt goes. On successful completion of the task-attempt the files + in the ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} (only) + are promoted to ${mapreduce.output.fileoutputformat.outputdir}. Of course, the + framework discards the sub-directory of unsuccessful task-attempts. This + is completely transparent to the application.
+ +The application-writer can take advantage of this by creating any + side-files required in a work directory during execution + of his task i.e. via + {@link #getWorkOutputPath(TaskInputOutputContext)}, and + the framework will move them out similarly - thus she doesn't have to pick + unique paths per task-attempt.
+ +The entire discussion holds true for maps of jobs with + reducer=NONE (i.e. 0 reduces) since output of the map, in that case, + goes directly to HDFS.
+ + @return the {@link Path} to the task's temporary output directory + for the map-reduce job.]]> +This method uses the {@link #getUniqueFile} method to make the file name + unique for the task.
+ + @param context the context for the task. + @param name the name for the file. + @param extension the extension for the file + @return a unique path accross all tasks of the job.]]> +close()
]]>
+ OutputFormat
, with its own key class and with its own value
+ class.
+
+
+ + Case two: to write data to different files provided by user +
+ ++ MultipleOutputs supports counters, by default they are disabled. The + counters group is the {@link MultipleOutputs} class name. The names of the + counters are the same as the output name. These count the number records + written to each output name. +
+ + Usage pattern for job submission: ++ + Job job = new Job(); + + FileInputFormat.setInputPath(job, inDir); + FileOutputFormat.setOutputPath(job, outDir); + + job.setMapperClass(MOMap.class); + job.setReducerClass(MOReduce.class); + ... + + // Defines additional single text based output 'text' for the job + MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, + LongWritable.class, Text.class); + + // Defines additional sequence-file based output 'sequence' for the job + MultipleOutputs.addNamedOutput(job, "seq", + SequenceFileOutputFormat.class, + LongWritable.class, Text.class); + ... + + job.waitForCompletion(true); + ... ++
+ Usage in Reducer: +
++ +String generateFileName(K k, V v) { + return k.toString() + "_" + v.toString(); + } + + public class MOReduce extends + Reducer<WritableComparable, Writable,WritableComparable, Writable> { + private MultipleOutputs mos; + public void setup(Context context) { + ... + mos = new MultipleOutputs(context); + } + + public void reduce(WritableComparable key, Iterator<Writable> values, + Context context) + throws IOException { + ... + mos.write("text", , key, new Text("Hello")); + mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a"); + mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b"); + mos.write(key, new Text("value"), generateFileName(key, new Text("value"))); + ... + } + + public void cleanup(Context) throws IOException { + mos.close(); + ... + } + + } +
+ When used in conjuction with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat, + MultipleOutputs can mimic the behaviour of MultipleTextOutputFormat and MultipleSequenceFileOutputFormat + from the old Hadoop API - ie, output can be written from the Reducer to more than one location. +
+ +
+ Use MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)
to write key and
+ value to a path specified by baseOutputPath
, with no need to specify a named output:
+
+ private MultipleOutputs+ +out; + + public void setup(Context context) { + out = new MultipleOutputs (context); + ... + } + + public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { + for (Text t : values) { + out.write(key, t, generateFileName(<parameter list...>)); + } + } + + protected void cleanup(Context context) throws IOException, InterruptedException { + out.close(); + } +
+ Use your own code in generateFileName()
to create a custom path to your results.
+ '/' characters in baseOutputPath
will be translated into directory levels in your file system.
+ Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc.
+ No call to context.write()
is necessary. See example generateFileName()
code below.
+
+ private String generateFileName(Text k) { + // expect Text k in format "Surname|Forename" + String[] kStr = k.toString().split("\\|"); + + String sName = kStr[0]; + String fName = kStr[1]; + + // example for k = Smith|John + // output written to /user/hadoop/path/to/output/Smith/John-r-00000 (etc) + return sName + "/" + fName; + } ++ +
+ Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000.
+ To prevent this use LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
+ instead of job.setOutputFormatClass(TextOutputFormat.class);
in your Hadoop job configuration.
+
The subarray to be used for the partitioning can be defined by means + of the following properties: +
+ +---+---+---+---+---+
+ | B | B | B | B | B |
+ +---+---+---+---+---+
+ 0 1 2 3 4
+ -5 -4 -3 -2 -1
+
+ The first row of numbers gives the position of the offsets 0...5 in
+ the array; the second row gives the corresponding negative offsets.
+ Contrary to Python, the specified subarray has byte i
+ and j
as first and last element, repectively, when
+ i
and j
are the left and right offset.
+
+ For Hadoop programs written in Java, it is advisable to use one of + the following static convenience methods for setting the offsets: +
Reducer.Context
for custom implementations]]>
+ DistributedCache
is a facility provided by the Map-Reduce
+ framework to cache files (text, archives, jars etc.) needed by applications.
+
+
+ Applications specify the files, via urls (hdfs:// or http://) to be cached
+ via the {@link org.apache.hadoop.mapred.JobConf}. The
+ DistributedCache
assumes that the files specified via urls are
+ already present on the {@link FileSystem} at the path specified by the url
+ and are accessible by every machine in the cluster.
The framework will copy the necessary files on to the slave node before + any tasks for the job are executed on that node. Its efficiency stems from + the fact that the files are only copied once per job and the ability to + cache archives which are un-archived on the slaves.
+ +DistributedCache
can be used to distribute simple, read-only
+ data/text files and/or more complex types such as archives, jars etc.
+ Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes.
+ Jars may be optionally added to the classpath of the tasks, a rudimentary
+ software distribution mechanism. Files have execution permissions.
+ In older version of Hadoop Map/Reduce users could optionally ask for symlinks
+ to be created in the working directory of the child task. In the current
+ version symlinks are always created. If the URL does not have a fragment
+ the name of the file or directory will be used. If multiple files or
+ directories map to the same link name, the last one added, will be used. All
+ others will not even be downloaded.
DistributedCache
tracks modification timestamps of the cache
+ files. Clearly the cache files should not be modified by the application
+ or externally while the job is executing.
Here is an illustrative example on how to use the
+ DistributedCache
:
+ + It is also very common to use the DistributedCache by using + {@link org.apache.hadoop.util.GenericOptionsParser}. + + This class includes methods that should be used by users + (specifically those mentioned in the example above, as well + as {@link DistributedCache#addArchiveToClassPath(Path, Configuration)}), + as well as methods intended for use by the MapReduce framework + (e.g., {@link org.apache.hadoop.mapred.JobClient}). + + @see org.apache.hadoop.mapred.JobConf + @see org.apache.hadoop.mapred.JobClient + @see org.apache.hadoop.mapreduce.Job]]> ++ // Setting up the cache for the application + + 1. Copy the requisite files to theFileSystem
: + + $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat + $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip + $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar + $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar + $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz + $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz + + 2. Setup the application'sJobConf
: + + JobConf job = new JobConf(); + DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), + job); + DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job); + DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); + DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job); + DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job); + DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job); + + 3. Use the cached files in the {@link org.apache.hadoop.mapred.Mapper} + or {@link org.apache.hadoop.mapred.Reducer}: + + public static class MapClass extends MapReduceBase + implements Mapper<K, V, K, V> { + + private Path[] localArchives; + private Path[] localFiles; + + public void configure(JobConf job) { + // Get the cached archives/files + File f = new File("./map.zip/some/file/in/zip.txt"); + } + + public void map(K key, V value, + OutputCollector<K, V> output, Reporter reporter) + throws IOException { + // Use data from the cached archives/files here + // ... + // ... + output.collect(k, v); + } + } + +
JobTracker
.]]>
+ ClusterStatus
provides clients with information such as:
+ JobTracker
.
+ Clients can query for the latest ClusterStatus
, via
+ {@link JobClient#getClusterStatus()}.
Counters
represent global counters, defined either by the
+ Map-Reduce framework or applications. Each Counter
can be of
+ any {@link Enum} type.
+
+ Counters
are bunched into {@link Group}s, each comprising of
+ counters from a particular Enum
class.]]>
+
Group
handles localization of the class name and the
+ counter names.
false
to ensure that individual input files are never split-up
+ so that {@link Mapper}s process entire files.
+
+ @param fs the file system that the file is on
+ @param filename the file name to check
+ @return is this file splitable?]]>
+ FileInputFormat
is the base class for all file-based
+ InputFormat
s. This provides a generic implementation of
+ {@link #getSplits(JobConf, int)}.
+ Subclasses of FileInputFormat
can also override the
+ {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
+ not split-up and are processed as a whole by {@link Mapper}s.]]>
+ false
otherwise]]>
+ Note: The following is valid only if the {@link OutputCommitter}
+ is {@link FileOutputCommitter}. If OutputCommitter
is not
+ a FileOutputCommitter
, the task's temporary output
+ directory is same as {@link #getOutputPath(JobConf)} i.e.
+ ${mapreduce.output.fileoutputformat.outputdir}$
Some applications need to create/write-to side-files, which differ from + the actual job-outputs. + +
In such cases there could be issues with 2 instances of the same TIP + (running simultaneously e.g. speculative tasks) trying to open/write-to the + same file (path) on HDFS. Hence the application-writer will have to pick + unique names per task-attempt (e.g. using the attemptid, say + attempt_200709221812_0001_m_000000_0), not just per TIP.
+ +To get around this the Map-Reduce framework helps the application-writer + out by maintaining a special + ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} + sub-directory for each task-attempt on HDFS where the output of the + task-attempt goes. On successful completion of the task-attempt the files + in the ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} (only) + are promoted to ${mapreduce.output.fileoutputformat.outputdir}. Of course, the + framework discards the sub-directory of unsuccessful task-attempts. This + is completely transparent to the application.
+ +The application-writer can take advantage of this by creating any + side-files required in ${mapreduce.task.output.dir} during execution + of his reduce-task i.e. via {@link #getWorkOutputPath(JobConf)}, and the + framework will move them out similarly - thus she doesn't have to pick + unique paths per task-attempt.
+ +Note: the value of ${mapreduce.task.output.dir} during + execution of a particular task-attempt is actually + ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_{$taskid}, and this value is + set by the map-reduce framework. So, just create any side-files in the + path returned by {@link #getWorkOutputPath(JobConf)} from map/reduce + task to take advantage of this feature.
+ +The entire discussion holds true for maps of jobs with + reducer=NONE (i.e. 0 reduces) since output of the map, in that case, + goes directly to HDFS.
+ + @return the {@link Path} to the task's temporary output directory + for the map-reduce job.]]> +The given name is postfixed with the task type, 'm' for maps, 'r' for + reduces and the task partition number. For example, give a name 'test' + running on the first map o the job the generated name will be + 'test-m-00000'.
+ + @param conf the configuration for the job. + @param name the name to make unique. + @return a unique name accross all tasks of the job.]]> +This method uses the {@link #getUniqueName} method to make the file name + unique for the task.
+ + @param conf the configuration for the job. + @param name the name for the file. + @return a unique path accross all tasks of the job.]]> +Note: The split is a logical split of the inputs and the + input files are not physically split into chunks. For e.g. a split could + be <input-file-path, start, offset> tuple. + + @param job job configuration. + @param numSplits the desired number of splits, a hint. + @return an array of {@link InputSplit}s for the job.]]> +
RecordReader
to respect
+ record boundaries while processing the logical split to present a
+ record-oriented view to the individual task.
+
+ @param split the {@link InputSplit}
+ @param job the job that this split belongs to
+ @return a {@link RecordReader}]]>
+ The Map-Reduce framework relies on the InputFormat
of the
+ job to:
+
InputSplit
for processing by
+ the {@link Mapper}.
+ The default behavior of file-based {@link InputFormat}s, typically + sub-classes of {@link FileInputFormat}, is to split the + input into logical {@link InputSplit}s based on the total size, in + bytes, of the input files. However, the {@link FileSystem} blocksize of + the input files is treated as an upper bound for input splits. A lower bound + on the split size can be set via + + mapreduce.input.fileinputformat.split.minsize.
+ +Clearly, logical splits based on input-size is insufficient for many
+ applications since record boundaries are to be respected. In such cases, the
+ application has to also implement a {@link RecordReader} on whom lies the
+ responsibilty to respect record-boundaries and present a record-oriented
+ view of the logical InputSplit
to the individual task.
+
+ @see InputSplit
+ @see RecordReader
+ @see JobClient
+ @see FileInputFormat]]>
+
String
s.
+ @throws IOException]]>
+ Typically, it presents a byte-oriented view on the input and is the + responsibility of {@link RecordReader} of the job to process this and present + a record-oriented view. + + @see InputFormat + @see RecordReader]]> +
JobClient
provides facilities to submit jobs, track their
+ progress, access component-tasks' reports/logs, get the Map-Reduce cluster
+ status information etc.
+
+ The job submission process involves: +
JobClient
to submit
+ the job and monitor its progress.
+
+ Here is an example on how to use JobClient
:
+ + Job Control + ++ // Create a new JobConf + JobConf job = new JobConf(new Configuration(), MyJob.class); + + // Specify various job-specific parameters + job.setJobName("myjob"); + + job.setInputPath(new Path("in")); + job.setOutputPath(new Path("out")); + + job.setMapperClass(MyJob.MyMapper.class); + job.setReducerClass(MyJob.MyReducer.class); + + // Submit the job, then poll for progress until the job is complete + JobClient.runJob(job); +
At times clients would chain map-reduce jobs to accomplish complex tasks + which cannot be done via a single map-reduce job. This is fairly easy since + the output of the job, typically, goes to distributed file-system and that + can be used as the input for the next job.
+ +However, this also means that the onus on ensuring jobs are complete + (success/failure) lies squarely on the clients. In such situations the + various job-control options are: +
false
otherwise.]]>
+ false
otherwise.]]>
+ For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed + in a single call to the reduce function if K1 and K2 compare as equal.
+ +Since {@link #setOutputKeyComparatorClass(Class)} can be used to control + how keys are sorted, this can be used in conjunction to simulate + secondary sort on values.
+ +Note: This is not a guarantee of the combiner sort being + stable in any sense. (In any case, with the order of available + map-outputs to the combiner being non-deterministic, it wouldn't make + that much sense.)
+ + @param theClass the comparator class to be used for grouping keys for the + combiner. It should implementRawComparator
.
+ @see #setOutputKeyComparatorClass(Class)]]>
+ For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed + in a single call to the reduce function if K1 and K2 compare as equal.
+ +Since {@link #setOutputKeyComparatorClass(Class)} can be used to control + how keys are sorted, this can be used in conjunction to simulate + secondary sort on values.
+ +Note: This is not a guarantee of the reduce sort being + stable in any sense. (In any case, with the order of available + map-outputs to the reduce being non-deterministic, it wouldn't make + that much sense.)
+ + @param theClass the comparator class to be used for grouping keys. + It should implementRawComparator
.
+ @see #setOutputKeyComparatorClass(Class)
+ @see #setCombinerKeyGroupingComparator(Class)]]>
+ The combiner is an application-specified aggregation operation, which + can help cut down the amount of data transferred between the + {@link Mapper} and the {@link Reducer}, leading to better performance.
+ +The framework may invoke the combiner 0, 1, or multiple times, in both + the mapper and reducer tasks. In general, the combiner is called as the + sort/merge result is written to disk. The combiner must: +
Typically the combiner is same as the Reducer
for the
+ job i.e. {@link #setReducerClass(Class)}.
true
if speculative execution be used for this job,
+ false
otherwise.]]>
+ false
.]]>
+ true
if speculative execution be
+ used for this job for map tasks,
+ false
otherwise.]]>
+ false
.]]>
+ true
if speculative execution be used
+ for reduce tasks for this job,
+ false
otherwise.]]>
+ false
.]]>
+ The number of maps is usually driven by the total size of the inputs + i.e. total number of blocks of the input files.
+ +The right level of parallelism for maps seems to be around 10-100 maps + per-node, although it has been set up to 300 or so for very cpu-light map + tasks. Task setup takes awhile, so it is best if the maps take at least a + minute to execute.
+ +The default behavior of file-based {@link InputFormat}s is to split the + input into logical {@link InputSplit}s based on the total size, in + bytes, of input files. However, the {@link FileSystem} blocksize of the + input files is treated as an upper bound for input splits. A lower bound + on the split size can be set via + + mapreduce.input.fileinputformat.split.minsize.
+ +Thus, if you expect 10TB of input data and have a blocksize of 128MB, + you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is + used to set it even higher.
+ + @param n the number of map tasks for this job. + @see InputFormat#getSplits(JobConf, int) + @see FileInputFormat + @see FileSystem#getDefaultBlockSize() + @see FileStatus#getBlockSize()]]> +The right number of reduces seems to be 0.95
or
+ 1.75
multiplied by (<no. of nodes> *
+
+ mapreduce.tasktracker.reduce.tasks.maximum).
+
With 0.95
all of the reduces can launch immediately and
+ start transfering map outputs as the maps finish. With 1.75
+ the faster nodes will finish their first round of reduces and launch a
+ second wave of reduces doing a much better job of load balancing.
Increasing the number of reduces increases the framework overhead, but + increases load balancing and lowers the cost of failures.
+ +The scaling factors above are slightly less than whole numbers to + reserve a few reduce slots in the framework for speculative-tasks, failures + etc.
+ + Reducer NONE + +It is legal to set the number of reduce-tasks to zero
.
In this case the output of the map-tasks directly go to distributed + file-system, to the path set by + {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the + framework doesn't sort the map-outputs before writing it out to HDFS.
+ + @param n the number of reduce tasks for this job.]]> +zero
, i.e. any failed map-task results in
+ the job being declared as {@link JobStatus#FAILED}.
+
+ @return the maximum percentage of map tasks that can fail without
+ the job being aborted.]]>
+ zero
, i.e. any failed reduce-task results
+ in the job being declared as {@link JobStatus#FAILED}.
+
+ @return the maximum percentage of reduce tasks that can fail without
+ the job being aborted.]]>
+ The debug command, run on the node where the map failed, is:
++ ++ $script $stdout $stderr $syslog $jobconf. +
The script file is distributed through {@link DistributedCache} + APIs. The script needs to be symlinked.
+ +Here is an example on how to submit a script +
+ + @param mDbgScript the script name]]> ++ job.setMapDebugScript("./myscript"); + DistributedCache.createSymlink(job); + DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); +
The debug command, run on the node where the map failed, is:
++ ++ $script $stdout $stderr $syslog $jobconf. +
The script file is distributed through {@link DistributedCache} + APIs. The script file needs to be symlinked
+ +Here is an example on how to submit a script +
+ + @param rDbgScript the script name]]> ++ job.setReduceDebugScript("./myscript"); + DistributedCache.createSymlink(job); + DistributedCache.addCacheFile("/debug/scripts/myscript#myscript"); +
This is typically used by application-writers to implement chaining of + Map-Reduce jobs in an asynchronous manner.
+ + @param uri the job end notification uri + @see JobStatus]]> +
+ ${mapreduce.cluster.local.dir}/taskTracker/$user/jobcache/$jobid/work/
.
+ This directory is exposed to the users through
+ mapreduce.job.local.dir
.
+ So, the tasks can use this space
+ as scratch space and share files among them.
+ This value is available as System property also.
+
+ @return The localized job specific shared directory]]>
+ + For backward compatibility, if the job configuration sets the + key {@link #MAPRED_TASK_MAXVMEM_PROPERTY}, that value is returned. + Otherwise, this method will return the larger of the values returned by + {@link #getMemoryForMapTask()} and {@link #getMemoryForReduceTask()} + after converting them into bytes. + + @return Memory required to run a task of this job, in bytes. + @see #setMaxVirtualMemoryForTask(long) + @deprecated Use {@link #getMemoryForMapTask()} and + {@link #getMemoryForReduceTask()}]]> +
$key
on
+ Linux or %key%
on Windows.
+
+ Example:
+ $key
on
+ Linux or %key%
on Windows.
+
+ Example:
+ $key
on
+ Linux or %key%
on Windows.
+
+ Example:
+ JobConf
is the primary interface for a user to describe a
+ map-reduce job to the Hadoop framework for execution. The framework tries to
+ faithfully execute the job as-is described by JobConf
, however:
+ JobConf
typically specifies the {@link Mapper}, combiner
+ (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and
+ {@link OutputFormat} implementations to be used etc.
+
+
Optionally JobConf
is used to specify other advanced facets
+ of the job such as Comparator
s to be used, files to be put in
+ the {@link DistributedCache}, whether or not intermediate and/or job outputs
+ are to be compressed (and how), debugability via user-provided scripts
+ ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
+ for doing post-processing on task logs, task's stdout, stderr, syslog.
+ and etc.
Here is an example on how to configure a job via JobConf
:
+ + @see JobClient + @see ClusterStatus + @see Tool + @see DistributedCache]]> ++ // Create a new JobConf + JobConf job = new JobConf(new Configuration(), MyJob.class); + + // Specify various job-specific parameters + job.setJobName("myjob"); + + FileInputFormat.setInputPaths(job, new Path("in")); + FileOutputFormat.setOutputPath(job, new Path("out")); + + job.setMapperClass(MyJob.MyMapper.class); + job.setCombinerClass(MyJob.MyReducer.class); + job.setReducerClass(MyJob.MyReducer.class); + + job.setInputFormat(SequenceFileInputFormat.class); + job.setOutputFormat(SequenceFileOutputFormat.class); +
+ JobID.getTaskIDsPattern("200707121733", null); ++ which will return : +
"job_200707121733_[0-9]*"+ @param jtIdentifier jobTracker identifier, or null + @param jobId job number, or null + @return a regex pattern matching JobIDs]]> +
job_200707121733_0003
, which represents the third job
+ running at the jobtracker started at 200707121733
.
+ + Applications should never construct or parse JobID strings, but rather + use appropriate constructors or {@link #forName(String)} method. + + @see TaskID + @see TaskAttemptID]]> +
Applications can use the {@link Reporter} provided to report progress + or just indicate that they are alive. In scenarios where the application + takes significant amount of time to process individual key/value + pairs, this is crucial since the framework might assume that the task has + timed-out and kill that task. The other way of avoiding this is to set + + mapreduce.task.timeout to a high-enough value (or even zero for no + time-outs).
+ + @param key the input key. + @param value the input value. + @param output collects mapped keys and values. + @param reporter facility to report progress.]]> +The Hadoop Map-Reduce framework spawns one map task for each
+ {@link InputSplit} generated by the {@link InputFormat} for the job.
+ Mapper
implementations can access the {@link JobConf} for the
+ job via the {@link JobConfigurable#configure(JobConf)} and initialize
+ themselves. Similarly they can use the {@link Closeable#close()} method for
+ de-initialization.
The framework then calls
+ {@link #map(Object, Object, OutputCollector, Reporter)}
+ for each key/value pair in the InputSplit
for that task.
All intermediate values associated with a given output key are
+ subsequently grouped by the framework, and passed to a {@link Reducer} to
+ determine the final output. Users can control the grouping by specifying
+ a Comparator
via
+ {@link JobConf#setOutputKeyComparatorClass(Class)}.
The grouped Mapper
outputs are partitioned per
+ Reducer
. Users can control which keys (and hence records) go to
+ which Reducer
by implementing a custom {@link Partitioner}.
+
+
Users can optionally specify a combiner
, via
+ {@link JobConf#setCombinerClass(Class)}, to perform local aggregation of the
+ intermediate outputs, which helps to cut down the amount of data transferred
+ from the Mapper
to the Reducer
.
+
+
The intermediate, grouped outputs are always stored in
+ {@link SequenceFile}s. Applications can specify if and how the intermediate
+ outputs are to be compressed and which {@link CompressionCodec}s are to be
+ used via the JobConf
.
If the job has
+ zero
+ reduces then the output of the Mapper
is directly written
+ to the {@link FileSystem} without grouping by keys.
Example:
++ ++ public class MyMapper<K extends WritableComparable, V extends Writable> + extends MapReduceBase implements Mapper<K, V, K, V> { + + static enum MyCounters { NUM_RECORDS } + + private String mapTaskId; + private String inputFile; + private int noRecords = 0; + + public void configure(JobConf job) { + mapTaskId = job.get(JobContext.TASK_ATTEMPT_ID); + inputFile = job.get(JobContext.MAP_INPUT_FILE); + } + + public void map(K key, V val, + OutputCollector<K, V> output, Reporter reporter) + throws IOException { + // Process the <key, value> pair (assume this takes a while) + // ... + // ... + + // Let the framework know that we are alive, and kicking! + // reporter.progress(); + + // Process some more + // ... + // ... + + // Increment the no. of <key, value> pairs processed + ++noRecords; + + // Increment counters + reporter.incrCounter(NUM_RECORDS, 1); + + // Every 100 records update application-level status + if ((noRecords%100) == 0) { + reporter.setStatus(mapTaskId + " processed " + noRecords + + " from input-file: " + inputFile); + } + + // Output the result + output.collect(key, val); + } + } +
Applications may write a custom {@link MapRunnable} to exert greater
+ control on map processing e.g. multi-threaded Mapper
s etc.
Mapping of input records to output records is complete when this method + returns.
+ + @param input the {@link RecordReader} to read the input records. + @param output the {@link OutputCollector} to collect the outputrecords. + @param reporter {@link Reporter} to report progress, status-updates etc. + @throws IOException]]> +MapRunnable
can exert greater
+ control on map processing e.g. multi-threaded, asynchronous mappers etc.
+
+ @see Mapper]]>
+ RecordReader
's for MultiFileSplit
's.
+ @see MultiFileSplit]]>
+ OutputCollector
is the generalization of the facility
+ provided by the Map-Reduce framework to collect data output by either the
+ Mapper
or the Reducer
i.e. intermediate outputs
+ or the output of the job.
false
otherwise
+ @throws IOException
+ @see #recoverTask(TaskAttemptContext)]]>
+ The Map-Reduce framework relies on the OutputCommitter
of
+ the job to:
+
The Map-Reduce framework relies on the OutputFormat
of the
+ job to:
+
key
.]]>
+ Partitioner
controls the partitioning of the keys of the
+ intermediate map-outputs. The key (or a subset of the key) is used to derive
+ the partition, typically by a hash function. The total number of partitions
+ is the same as the number of reduce tasks for the job. Hence this controls
+ which of the m
reduce tasks the intermediate key (and hence the
+ record) is sent for reduction.
+
+ @see Reducer]]>
+ 1.0
.
+ @throws IOException]]>
+ RecordReader
, typically, converts the byte-oriented view of
+ the input, provided by the InputSplit
, and presents a
+ record-oriented view for the {@link Mapper} and {@link Reducer} tasks for
+ processing. It thus assumes the responsibility of processing record
+ boundaries and presenting the tasks with keys and values.
RecordWriter
implementations write the job outputs to the
+ {@link FileSystem}.
+
+ @see OutputFormat]]>
+
The framework calls this method for each
+ <key, (list of values)>
pair in the grouped inputs.
+ Output values must be of the same type as input values. Input keys must
+ not be altered. The framework will reuse the key and value objects
+ that are passed into the reduce, therefore the application should clone
+ the objects they want to keep a copy of. In many cases, all values are
+ combined into zero or one value.
+
Output pairs are collected with calls to + {@link OutputCollector#collect(Object,Object)}.
+ +Applications can use the {@link Reporter} provided to report progress + or just indicate that they are alive. In scenarios where the application + takes a significant amount of time to process individual key/value + pairs, this is crucial since the framework might assume that the task has + timed-out and kill that task. The other way of avoiding this is to set + + mapreduce.task.timeout to a high-enough value (or even zero for no + time-outs).
+ + @param key the key. + @param values the list of values to reduce. + @param output to collect keys and combined values. + @param reporter facility to report progress.]]> +Reducer
s for the job is set by the user via
+ {@link JobConf#setNumReduceTasks(int)}. Reducer
implementations
+ can access the {@link JobConf} for the job via the
+ {@link JobConfigurable#configure(JobConf)} method and initialize themselves.
+ Similarly they can use the {@link Closeable#close()} method for
+ de-initialization.
+
+ Reducer
has 3 primary phases:
Reducer
is input the grouped output of a {@link Mapper}.
+ In the phase the framework, for each Reducer
, fetches the
+ relevant partition of the output of all the Mapper
s, via HTTP.
+
The framework groups Reducer
inputs by key
s
+ (since different Mapper
s may have output the same key) in this
+ stage.
The shuffle and sort phases occur simultaneously i.e. while outputs are + being fetched they are merged.
+ + SecondarySort + +If equivalence rules for keys while grouping the intermediates are
+ different from those for grouping keys before reduction, then one may
+ specify a Comparator
via
+ {@link JobConf#setOutputValueGroupingComparator(Class)}.Since
+ {@link JobConf#setOutputKeyComparatorClass(Class)} can be used to
+ control how intermediate keys are grouped, these can be used in conjunction
+ to simulate secondary sort on values.
In this phase the
+ {@link #reduce(Object, Iterator, OutputCollector, Reporter)}
+ method is called for each <key, (list of values)>
pair in
+ the grouped inputs.
The output of the reduce task is typically written to the + {@link FileSystem} via + {@link OutputCollector#collect(Object, Object)}.
+The output of the Reducer
is not re-sorted.
Example:
++ + @see Mapper + @see Partitioner + @see Reporter + @see MapReduceBase]]> ++ public class MyReducer<K extends WritableComparable, V extends Writable> + extends MapReduceBase implements Reducer<K, V, K, V> { + + static enum MyCounters { NUM_RECORDS } + + private String reduceTaskId; + private int noKeys = 0; + + public void configure(JobConf job) { + reduceTaskId = job.get(JobContext.TASK_ATTEMPT_ID); + } + + public void reduce(K key, Iterator<V> values, + OutputCollector<K, V> output, + Reporter reporter) + throws IOException { + + // Process + int noValues = 0; + while (values.hasNext()) { + V value = values.next(); + + // Increment the no. of values for this key + ++noValues; + + // Process the <key, value> pair (assume this takes a while) + // ... + // ... + + // Let the framework know that we are alive, and kicking! + if ((noValues%10) == 0) { + reporter.progress(); + } + + // Process some more + // ... + // ... + + // Output the <key, value> + output.collect(key, value); + } + + // Increment the no. of <key, list of values> pairs processed + ++noKeys; + + // Increment counters + reporter.incrCounter(NUM_RECORDS, 1); + + // Every 100 keys update application-level status + if ((noKeys%100) == 0) { + reporter.setStatus(reduceTaskId + " processed " + noKeys); + } + } + } +
Reporter
+ provided to report progress or just indicate that they are alive. In
+ scenarios where the application takes significant amount of time to
+ process individual key/value pairs, this is crucial since the framework
+ might assume that the task has timed-out and kill that task.
+
+ Applications can also update {@link Counters} via the provided
+ Reporter
.
false
.
+ @throws IOException]]>
+ false
.
+ @throws IOException]]>
+ false
.
+ @throws IOException]]>
+ Clients can get hold of RunningJob
via the {@link JobClient}
+ and then query the running-job for details such as name, configuration,
+ progress etc.
false
otherwise.]]>
+ false
otherwise.]]>
+ This feature can be used when map/reduce tasks crashes deterministically on + certain input. This happens due to bugs in the map/reduce function. The usual + course would be to fix these bugs. But sometimes this is not possible; + perhaps the bug is in third party libraries for which the source code is + not available. Due to this, the task never reaches to completion even with + multiple attempts and complete data for that task is lost.
+ +With this feature, only a small portion of data is lost surrounding + the bad record, which may be acceptable for some user applications. + see {@link SkipBadRecords#setMapperMaxSkipRecords(Configuration, long)}
+ +The skipping mode gets kicked off after certain no of failures + see {@link SkipBadRecords#setAttemptsToStartSkipping(Configuration, int)}
+ +In the skipping mode, the map/reduce task maintains the record range which + is getting processed at all times. Before giving the input to the + map/reduce function, it sends this record range to the Task tracker. + If task crashes, the Task tracker knows which one was the last reported + range. On further attempts that range get skipped.
]]> ++ TaskAttemptID.getTaskAttemptIDsPattern(null, null, true, 1, null); ++ which will return : +
"attempt_[^_]*_[0-9]*_m_000001_[0-9]*"+ @param jtIdentifier jobTracker identifier, or null + @param jobId job number, or null + @param isMap whether the tip is a map, or null + @param taskId taskId number, or null + @param attemptId the task attempt number, or null + @return a regex pattern matching TaskAttemptIDs]]> +
+ TaskAttemptID.getTaskAttemptIDsPattern(null, null, TaskType.MAP, 1, null); ++ which will return : +
"attempt_[^_]*_[0-9]*_m_000001_[0-9]*"+ @param jtIdentifier jobTracker identifier, or null + @param jobId job number, or null + @param type the {@link TaskType} + @param taskId taskId number, or null + @param attemptId the task attempt number, or null + @return a regex pattern matching TaskAttemptIDs]]> +
attempt_200707121733_0003_m_000005_0
, which represents the
+ zeroth task attempt for the fifth map task in the third job
+ running at the jobtracker started at 200707121733
.
+ + Applications should never construct or parse TaskAttemptID strings + , but rather use appropriate constructors or {@link #forName(String)} + method. + + @see JobID + @see TaskID]]> +
+ TaskID.getTaskIDsPattern(null, null, true, 1); ++ which will return : +
"task_[^_]*_[0-9]*_m_000001*"+ @param jtIdentifier jobTracker identifier, or null + @param jobId job number, or null + @param isMap whether the tip is a map, or null + @param taskId taskId number, or null + @return a regex pattern matching TaskIDs + @deprecated Use {@link TaskID#getTaskIDsPattern(String, Integer, TaskType, + Integer)}]]> +
+ TaskID.getTaskIDsPattern(null, null, true, 1); ++ which will return : +
"task_[^_]*_[0-9]*_m_000001*"+ @param jtIdentifier jobTracker identifier, or null + @param jobId job number, or null + @param type the {@link TaskType}, or null + @param taskId taskId number, or null + @return a regex pattern matching TaskIDs]]> +
task_200707121733_0003_m_000005
, which represents the
+ fifth map task in the third job running at the jobtracker
+ started at 200707121733
.
+ + Applications should never construct or parse TaskID strings + , but rather use appropriate constructors or {@link #forName(String)} + method. + + @see JobID + @see TaskAttemptID]]> +
) }]]> +
+ For the added Mapper the configuration given for it,
+ mapperConf
, have precedence over the job's JobConf. This
+ precedence is in effect when the task is running.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the + ChainMapper, this is done by the addMapper for the last mapper in the chain +
+
+ @param job job's JobConf to add the Mapper class.
+ @param klass the Mapper class to add.
+ @param inputKeyClass mapper input key class.
+ @param inputValueClass mapper input value class.
+ @param outputKeyClass mapper output key class.
+ @param outputValueClass mapper output value class.
+ @param byValue indicates if key/values should be passed by value
+ to the next Mapper in the chain, if any.
+ @param mapperConf a JobConf with the configuration for the Mapper
+ class. It is recommended to use a JobConf without default values using the
+ JobConf(boolean loadDefaults)
constructor with FALSE.]]>
+
super.configure(...)
should be
+ invoked at the beginning of the overwriter method.]]>
+ super.close()
should be
+ invoked at the end of the overwriter method.]]>
+ + The key functionality of this feature is that the Mappers in the chain do not + need to be aware that they are executed in a chain. This enables having + reusable specialized Mappers that can be combined to perform composite + operations within a single task. +
+ Special care has to be taken when creating chains that the key/values output + by a Mapper are valid for the following Mapper in the chain. It is assumed + all Mappers and the Reduce in the chain use maching output and input key and + value classes as no conversion is done by the chaining code. +
+ Using the ChainMapper and the ChainReducer classes is possible to compose
+ Map/Reduce jobs that look like [MAP+ / REDUCE MAP*]
. And
+ immediate benefit of this pattern is a dramatic reduction in disk IO.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the + ChainMapper, this is done by the addMapper for the last mapper in the chain. +
+ ChainMapper usage pattern: +
+
+ ... + conf.setJobName("chain"); + conf.setInputFormat(TextInputFormat.class); + conf.setOutputFormat(TextOutputFormat.class); + + JobConf mapAConf = new JobConf(false); + ... + ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class, + Text.class, Text.class, true, mapAConf); + + JobConf mapBConf = new JobConf(false); + ... + ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, mapBConf); + + JobConf reduceConf = new JobConf(false); + ... + ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class, + Text.class, Text.class, true, reduceConf); + + ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, null); + + ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class, + LongWritable.class, LongWritable.class, true, null); + + FileInputFormat.setInputPaths(conf, inDir); + FileOutputFormat.setOutputPath(conf, outDir); + ... + + JobClient jc = new JobClient(conf); + RunningJob job = jc.submitJob(conf); + ... +]]> +
+ For the added Reducer the configuration given for it,
+ reducerConf
, have precedence over the job's JobConf. This
+ precedence is in effect when the task is running.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the
+ ChainReducer, this is done by the setReducer or the addMapper for the last
+ element in the chain.
+
+ @param job job's JobConf to add the Reducer class.
+ @param klass the Reducer class to add.
+ @param inputKeyClass reducer input key class.
+ @param inputValueClass reducer input value class.
+ @param outputKeyClass reducer output key class.
+ @param outputValueClass reducer output value class.
+ @param byValue indicates if key/values should be passed by value
+ to the next Mapper in the chain, if any.
+ @param reducerConf a JobConf with the configuration for the Reducer
+ class. It is recommended to use a JobConf without default values using the
+ JobConf(boolean loadDefaults)
constructor with FALSE.]]>
+
+ For the added Mapper the configuration given for it,
+ mapperConf
, have precedence over the job's JobConf. This
+ precedence is in effect when the task is running.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the
+ ChainMapper, this is done by the addMapper for the last mapper in the chain
+ .
+
+ @param job chain job's JobConf to add the Mapper class.
+ @param klass the Mapper class to add.
+ @param inputKeyClass mapper input key class.
+ @param inputValueClass mapper input value class.
+ @param outputKeyClass mapper output key class.
+ @param outputValueClass mapper output value class.
+ @param byValue indicates if key/values should be passed by value
+ to the next Mapper in the chain, if any.
+ @param mapperConf a JobConf with the configuration for the Mapper
+ class. It is recommended to use a JobConf without default values using the
+ JobConf(boolean loadDefaults)
constructor with FALSE.]]>
+
super.configure(...)
should be
+ invoked at the beginning of the overwriter method.]]>
+ map(...)
methods of the Mappers in the chain.]]>
+ super.close()
should be
+ invoked at the end of the overwriter method.]]>
+ + The key functionality of this feature is that the Mappers in the chain do not + need to be aware that they are executed after the Reducer or in a chain. + This enables having reusable specialized Mappers that can be combined to + perform composite operations within a single task. +
+ Special care has to be taken when creating chains that the key/values output + by a Mapper are valid for the following Mapper in the chain. It is assumed + all Mappers and the Reduce in the chain use maching output and input key and + value classes as no conversion is done by the chaining code. +
+ Using the ChainMapper and the ChainReducer classes is possible to compose
+ Map/Reduce jobs that look like [MAP+ / REDUCE MAP*]
. And
+ immediate benefit of this pattern is a dramatic reduction in disk IO.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the + ChainReducer, this is done by the setReducer or the addMapper for the last + element in the chain. +
+ ChainReducer usage pattern: +
+
+ ... + conf.setJobName("chain"); + conf.setInputFormat(TextInputFormat.class); + conf.setOutputFormat(TextOutputFormat.class); + + JobConf mapAConf = new JobConf(false); + ... + ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class, + Text.class, Text.class, true, mapAConf); + + JobConf mapBConf = new JobConf(false); + ... + ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, mapBConf); + + JobConf reduceConf = new JobConf(false); + ... + ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class, + Text.class, Text.class, true, reduceConf); + + ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, null); + + ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class, + LongWritable.class, LongWritable.class, true, null); + + FileInputFormat.setInputPaths(conf, inDir); + FileOutputFormat.setOutputPath(conf, outDir); + ... + + JobClient jc = new JobClient(conf); + RunningJob job = jc.submitJob(conf); + ... +]]> +
CombineFileSplit
's.
+ @see CombineFileSplit]]>
+ SequenceFileInputFormat
.
+
+ @see CombineFileInputFormat]]>
+ TextInputFormat
.
+
+ @see CombineFileInputFormat]]>
+ false
+ if it is single. If the name output is not defined it returns
+ false
]]>
+ + MultipleOutputs supports counters, by default the are disabled. + The counters group is the {@link MultipleOutputs} class name. +
+ The names of the counters are the same as the named outputs. For multi + named outputs the name of the counter is the concatenation of the named + output, and underscore '_' and the multiname. + + @param conf job conf to enableadd the named output. + @param enabled indicates if the counters will be enabled or not.]]> ++ MultipleOutputs supports counters, by default the are disabled. + The counters group is the {@link MultipleOutputs} class name. +
+ The names of the counters are the same as the named outputs. For multi + named outputs the name of the counter is the concatenation of the named + output, and underscore '_' and the multiname. + + + @param conf job conf to enableadd the named output. + @return TRUE if the counters are enabled, FALSE if they are disabled.]]> +super.close()
at the
+ end of their close()
+
+ @throws java.io.IOException thrown if any of the MultipleOutput files
+ could not be closed properly.]]>
+ map()
and reduce()
methods of the
+ Mapper
and Reducer
implementations.
+
+ Each additional output, or named output, may be configured with its own
+ OutputFormat
, with its own key class and with its own value
+ class.
+
+ A named output can be a single file or a multi file. The later is refered as + a multi named output. +
+ A multi named output is an unbound set of files all sharing the same
+ OutputFormat
, key class and value class configuration.
+
+ When named outputs are used within a Mapper
implementation,
+ key/values written to a name output are not part of the reduce phase, only
+ key/values written to the job OutputCollector
are part of the
+ reduce phase.
+
+ MultipleOutputs supports counters, by default the are disabled. The counters + group is the {@link MultipleOutputs} class name. +
+ The names of the counters are the same as the named outputs. For multi + named outputs the name of the counter is the concatenation of the named + output, and underscore '_' and the multiname. ++ Job configuration usage pattern is: +
+ + JobConf conf = new JobConf(); + + conf.setInputPath(inDir); + FileOutputFormat.setOutputPath(conf, outDir); + + conf.setMapperClass(MOMap.class); + conf.setReducerClass(MOReduce.class); + ... + + // Defines additional single text based output 'text' for the job + MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class, + LongWritable.class, Text.class); + + // Defines additional multi sequencefile based output 'sequence' for the + // job + MultipleOutputs.addMultiNamedOutput(conf, "seq", + SequenceFileOutputFormat.class, + LongWritable.class, Text.class); + ... + + JobClient jc = new JobClient(); + RunningJob job = jc.submitJob(conf); + + ... ++
+ Job configuration usage pattern is: +
+ + public class MOReduce implements + Reducer<WritableComparable, Writable> { + private MultipleOutputs mos; + + public void configure(JobConf conf) { + ... + mos = new MultipleOutputs(conf); + } + + public void reduce(WritableComparable key, Iterator<Writable> values, + OutputCollector output, Reporter reporter) + throws IOException { + ... + mos.getCollector("text", reporter).collect(key, new Text("Hello")); + mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye")); + mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau")); + ... + } + + public void close() throws IOException { + mos.close(); + ... + } + + } +]]> +
+ Map implementations using this MapRunnable must be thread-safe. +
+ The Map-Reduce job has to be configured to use this MapRunnable class (using
+ the JobConf.setMapRunnerClass method) and
+ the number of threads the thread-pool can use with the
+ mapred.map.multithreadedrunner.threads
property, its default
+ value is 10 threads.
+
]]> +
ClusterMetrics
provides clients with information such as:
+ Clients can query for the latest ClusterMetrics
, via
+ {@link Cluster#getClusterStatus()}.
Counters
represent global counters, defined either by the
+ Map-Reduce framework or applications. Each Counter
is named by
+ an {@link Enum} and has a long for the value.
+
+ Counters
are bunched into Groups, each comprising of
+ counters from a particular Enum
class.]]>
+
Counters
holds per job/task counters, defined either by the
+ Map-Reduce framework or applications. Each Counter
can be of
+ any {@link Enum} type.
+
+ Counters
are bunched into {@link CounterGroup}s, each
+ comprising of counters from a particular Enum
class.]]>
+
Note: The split is a logical split of the inputs and the + input files are not physically split into chunks. For e.g. a split could + be <input-file-path, start, offset> tuple. The InputFormat + also creates the {@link RecordReader} to read the {@link InputSplit}. + + @param context job configuration. + @return an array of {@link InputSplit}s for the job.]]> +
The Map-Reduce framework relies on the InputFormat
of the
+ job to:
+
InputSplit
for processing by
+ the {@link Mapper}.
+ The default behavior of file-based {@link InputFormat}s, typically + sub-classes of {@link FileInputFormat}, is to split the + input into logical {@link InputSplit}s based on the total size, in + bytes, of the input files. However, the {@link FileSystem} blocksize of + the input files is treated as an upper bound for input splits. A lower bound + on the split size can be set via + + mapreduce.input.fileinputformat.split.minsize.
+ +Clearly, logical splits based on input-size is insufficient for many
+ applications since record boundaries are to respected. In such cases, the
+ application has to also implement a {@link RecordReader} on whom lies the
+ responsibility to respect record-boundaries and present a record-oriented
+ view of the logical InputSplit
to the individual task.
+
+ @see InputSplit
+ @see RecordReader
+ @see FileInputFormat]]>
+
Typically, it presents a byte-oriented view on the input and is the + responsibility of {@link RecordReader} of the job to process this and present + a record-oriented view. + + @see InputFormat + @see RecordReader]]> +
Configuration
so
+ that any necessary internal modifications do not reflect on the incoming
+ parameter.
+
+ A Cluster will be created from the conf parameter only when it's needed.
+
+ @param conf the configuration
+ @return the {@link Job} , with no connection to a cluster yet.
+ @throws IOException]]>
+ Configuration
so
+ that any necessary internal modifications do not reflect on the incoming
+ parameter.
+
+ @param conf the configuration
+ @return the {@link Job} , with no connection to a cluster yet.
+ @throws IOException]]>
+ Configuration
so
+ that any necessary internal modifications do not reflect on the incoming
+ parameter.
+
+ @param status job status
+ @param conf job configuration
+ @return the {@link Job} , with no connection to a cluster yet.
+ @throws IOException]]>
+ Configuration
so
+ that any necessary internal modifications do not reflect on the incoming
+ parameter.
+
+ @param ignored
+ @return the {@link Job} , with no connection to a cluster yet.
+ @throws IOException
+ @deprecated Use {@link #getInstance()}]]>
+ Configuration
so
+ that any necessary internal modifications do not reflect on the incoming
+ parameter.
+
+ @param ignored
+ @param conf job configuration
+ @return the {@link Job} , with no connection to a cluster yet.
+ @throws IOException
+ @deprecated Use {@link #getInstance(Configuration)}]]>
+ Configuration
so
+ that any necessary internal modifications do not reflect on the incoming
+ parameter.
+
+ @param cluster cluster
+ @param status job status
+ @param conf job configuration
+ @return the {@link Job} , with no connection to a cluster yet.
+ @throws IOException]]>
+ false
.
+ @throws IOException]]>
+ false
.
+ @throws IOException]]>
+ false
.]]>
+ false
.]]>
+ false
.]]>
+ + Normally the user creates the application, describes various facets of the + job via {@link Job} and then submits the job and monitor its progress.
+ +Here is an example on how to submit a job:
+]]> ++ // Create a new Job + Job job = Job.getInstance(); + job.setJarByClass(MyJob.class); + + // Specify various job-specific parameters + job.setJobName("myjob"); + + job.setInputPath(new Path("in")); + job.setOutputPath(new Path("out")); + + job.setMapperClass(MyJob.MyMapper.class); + job.setReducerClass(MyJob.MyReducer.class); + + // Submit the job, then poll for progress until the job is complete + job.waitForCompletion(true); +
job_200707121733_0003
, which represents the third job
+ running at the jobtracker started at 200707121733
.
+ + Applications should never construct or parse JobID strings, but rather + use appropriate constructors or {@link #forName(String)} method. + + @see TaskID + @see TaskAttemptID]]> +
The Hadoop Map-Reduce framework spawns one map task for each
+ {@link InputSplit} generated by the {@link InputFormat} for the job.
+ Mapper
implementations can access the {@link Configuration} for
+ the job via the {@link JobContext#getConfiguration()}.
+
+
The framework first calls
+ {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
+ {@link #map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context)}
+ for each key/value pair in the InputSplit
. Finally
+ {@link #cleanup(org.apache.hadoop.mapreduce.Mapper.Context)} is called.
All intermediate values associated with a given output key are + subsequently grouped by the framework, and passed to a {@link Reducer} to + determine the final output. Users can control the sorting and grouping by + specifying two key {@link RawComparator} classes.
+ +The Mapper
outputs are partitioned per
+ Reducer
. Users can control which keys (and hence records) go to
+ which Reducer
by implementing a custom {@link Partitioner}.
+
+
Users can optionally specify a combiner
, via
+ {@link Job#setCombinerClass(Class)}, to perform local aggregation of the
+ intermediate outputs, which helps to cut down the amount of data transferred
+ from the Mapper
to the Reducer
.
+
+
Applications can specify if and how the intermediate
+ outputs are to be compressed and which {@link CompressionCodec}s are to be
+ used via the Configuration
.
If the job has zero
+ reduces then the output of the Mapper
is directly written
+ to the {@link OutputFormat} without sorting by keys.
Example:
++ ++ public class TokenCounterMapper + extends Mapper<Object, Text, Text, IntWritable>{ + + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + + public void map(Object key, Text value, Context context) throws IOException, InterruptedException { + StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + context.write(word, one); + } + } + } +
Applications may override the
+ {@link #run(org.apache.hadoop.mapreduce.Mapper.Context)} method to exert
+ greater control on map processing e.g. multi-threaded Mapper
s
+ etc.
false
otherwise
+ @see #recoverTask(TaskAttemptContext)
+ @deprecated Use {@link #isRecoverySupported(JobContext)} instead.]]>
+ false
otherwise
+ @throws IOException
+ @see #recoverTask(TaskAttemptContext)]]>
+ The Map-Reduce framework relies on the OutputCommitter
of
+ the job to:
+
The Map-Reduce framework relies on the OutputFormat
of the
+ job to:
+
key
.]]>
+ Partitioner
controls the partitioning of the keys of the
+ intermediate map-outputs. The key (or a subset of the key) is used to derive
+ the partition, typically by a hash function. The total number of partitions
+ is the same as the number of reduce tasks for the job. Hence this controls
+ which of the m
reduce tasks the intermediate key (and hence the
+ record) is sent for reduction.
+
+ Note: If you require your Partitioner class to obtain the Job's configuration
+ object, implement the {@link Configurable} interface.
+
+ @see Reducer]]>
+ RecordWriter
implementations write the job outputs to the
+ {@link FileSystem}.
+
+ @see OutputFormat]]>
+
Reducer
implementations
+ can access the {@link Configuration} for the job via the
+ {@link JobContext#getConfiguration()} method.
+
+ Reducer
has 3 primary phases:
The Reducer
copies the sorted output from each
+ {@link Mapper} using HTTP across the network.
The framework merge sorts Reducer
inputs by
+ key
s
+ (since different Mapper
s may have output the same key).
The shuffle and sort phases occur simultaneously i.e. while outputs are + being fetched they are merged.
+ + SecondarySort + +To achieve a secondary sort on the values returned by the value + iterator, the application should extend the key with the secondary + key and define a grouping comparator. The keys will be sorted using the + entire key, but will be grouped using the grouping comparator to decide + which keys and values are sent in the same call to reduce.The grouping + comparator is specified via + {@link Job#setGroupingComparatorClass(Class)}. The sort order is + controlled by + {@link Job#setSortComparatorClass(Class)}.
+ + + For example, say that you want to find duplicate web pages and tag them + all with the url of the "best" known example. You would set up the job + like: +In this phase the
+ {@link #reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)}
+ method is called for each <key, (collection of values)>
in
+ the sorted inputs.
The output of the reduce task is typically written to a + {@link RecordWriter} via + {@link Context#write(Object, Object)}.
+The output of the Reducer
is not re-sorted.
Example:
++ + @see Mapper + @see Partitioner]]> ++ public class IntSumReducer<Key> extends Reducer<Key,IntWritable, + Key,IntWritable> { + private IntWritable result = new IntWritable(); + + public void reduce(Key key, Iterable<IntWritable> values, + Context context) throws IOException, InterruptedException { + int sum = 0; + for (IntWritable val : values) { + sum += val.get(); + } + result.set(sum); + context.write(key, result); + } + } +
Counter
for the given counterName
]]>
+ counterName
.
+ @param counterName counter name
+ @return the Counter
for the given groupName
and
+ counterName
]]>
+ attempt_200707121733_0003_m_000005_0
, which represents the
+ zeroth task attempt for the fifth map task in the third job
+ running at the jobtracker started at 200707121733
.
+ + Applications should never construct or parse TaskAttemptID strings + , but rather use appropriate constructors or {@link #forName(String)} + method. + + @see JobID + @see TaskID]]> +
task_200707121733_0003_m_000005
, which represents the
+ fifth map task in the third job running at the jobtracker
+ started at 200707121733
.
+ + Applications should never construct or parse TaskID strings + , but rather use appropriate constructors or {@link #forName(String)} + method. + + @see JobID + @see TaskAttemptID]]> +
mapperConf
, have precedence over the job's Configuration. This
+ precedence is in effect when the task is running.
+
+ + IMPORTANT: There is no need to specify the output key/value classes for the + ChainMapper, this is done by the addMapper for the last mapper in the chain +
+ + @param job + The job. + @param klass + the Mapper class to add. + @param inputKeyClass + mapper input key class. + @param inputValueClass + mapper input value class. + @param outputKeyClass + mapper output key class. + @param outputValueClass + mapper output value class. + @param mapperConf + a configuration for the Mapper class. It is recommended to use a + Configuration without default values using the +Configuration(boolean loadDefaults)
constructor with
+ FALSE.]]>
+ + The key functionality of this feature is that the Mappers in the chain do not + need to be aware that they are executed in a chain. This enables having + reusable specialized Mappers that can be combined to perform composite + operations within a single task. +
++ Special care has to be taken when creating chains that the key/values output + by a Mapper are valid for the following Mapper in the chain. It is assumed + all Mappers and the Reduce in the chain use matching output and input key and + value classes as no conversion is done by the chaining code. +
+
+ Using the ChainMapper and the ChainReducer classes is possible to compose
+ Map/Reduce jobs that look like [MAP+ / REDUCE MAP*]
. And
+ immediate benefit of this pattern is a dramatic reduction in disk IO.
+
+ IMPORTANT: There is no need to specify the output key/value classes for the + ChainMapper, this is done by the addMapper for the last mapper in the chain. +
+ ChainMapper usage pattern: ++ +
+ ... + Job = new Job(conf); + + Configuration mapAConf = new Configuration(false); + ... + ChainMapper.addMapper(job, AMap.class, LongWritable.class, Text.class, + Text.class, Text.class, true, mapAConf); + + Configuration mapBConf = new Configuration(false); + ... + ChainMapper.addMapper(job, BMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, mapBConf); + + ... + + job.waitForComplettion(true); + ... +]]> +
reducerConf
, have precedence over the job's Configuration.
+ This precedence is in effect when the task is running.
+
+ + IMPORTANT: There is no need to specify the output key/value classes for the + ChainReducer, this is done by the setReducer or the addMapper for the last + element in the chain. +
+ + @param job + the job + @param klass + the Reducer class to add. + @param inputKeyClass + reducer input key class. + @param inputValueClass + reducer input value class. + @param outputKeyClass + reducer output key class. + @param outputValueClass + reducer output value class. + @param reducerConf + a configuration for the Reducer class. It is recommended to use a + Configuration without default values using the +Configuration(boolean loadDefaults)
constructor with
+ FALSE.]]>
+ mapperConf
, have precedence over the job's Configuration. This
+ precedence is in effect when the task is running.
+
+ + IMPORTANT: There is no need to specify the output key/value classes for the + ChainMapper, this is done by the addMapper for the last mapper in the + chain. +
+ + @param job + The job. + @param klass + the Mapper class to add. + @param inputKeyClass + mapper input key class. + @param inputValueClass + mapper input value class. + @param outputKeyClass + mapper output key class. + @param outputValueClass + mapper output value class. + @param mapperConf + a configuration for the Mapper class. It is recommended to use a + Configuration without default values using the +Configuration(boolean loadDefaults)
constructor with
+ FALSE.]]>
+ + The key functionality of this feature is that the Mappers in the chain do not + need to be aware that they are executed after the Reducer or in a chain. This + enables having reusable specialized Mappers that can be combined to perform + composite operations within a single task. +
++ Special care has to be taken when creating chains that the key/values output + by a Mapper are valid for the following Mapper in the chain. It is assumed + all Mappers and the Reduce in the chain use matching output and input key and + value classes as no conversion is done by the chaining code. +
+ Using the ChainMapper and the ChainReducer classes is possible to
+ compose Map/Reduce jobs that look like [MAP+ / REDUCE MAP*]
. And
+ immediate benefit of this pattern is a dramatic reduction in disk IO.
+ IMPORTANT: There is no need to specify the output key/value classes for the + ChainReducer, this is done by the setReducer or the addMapper for the last + element in the chain. +
+ ChainReducer usage pattern: ++ +
+ ... + Job = new Job(conf); + .... + + Configuration reduceConf = new Configuration(false); + ... + ChainReducer.setReducer(job, XReduce.class, LongWritable.class, Text.class, + Text.class, Text.class, true, reduceConf); + + ChainReducer.addMapper(job, CMap.class, Text.class, Text.class, + LongWritable.class, Text.class, false, null); + + ChainReducer.addMapper(job, DMap.class, LongWritable.class, Text.class, + LongWritable.class, LongWritable.class, true, null); + + ... + + job.waitForCompletion(true); + ... +]]> +
+ Implementations are responsible for writing the fields of the object + to PreparedStatement, and reading the fields of the object from the + ResultSet. + +
Example:
+ If we have the following table in the database : ++ CREATE TABLE MyTable ( + counter INTEGER NOT NULL, + timestamp BIGINT NOT NULL, + ); ++ then we can read/write the tuples from/to the table with : +
+ public class MyWritable implements Writable, DBWritable { + // Some data + private int counter; + private long timestamp; + + //Writable#write() implementation + public void write(DataOutput out) throws IOException { + out.writeInt(counter); + out.writeLong(timestamp); + } + + //Writable#readFields() implementation + public void readFields(DataInput in) throws IOException { + counter = in.readInt(); + timestamp = in.readLong(); + } + + public void write(PreparedStatement statement) throws SQLException { + statement.setInt(1, counter); + statement.setLong(2, timestamp); + } + + public void readFields(ResultSet resultSet) throws SQLException { + counter = resultSet.getInt(1); + timestamp = resultSet.getLong(2); + } + } +]]> +
CombineFileSplit
's.
+
+ @see CombineFileSplit]]>
+ SequenceFileInputFormat
.
+
+ @see CombineFileInputFormat]]>
+ TextInputFormat
.
+
+ @see CombineFileInputFormat]]>
+ false
to ensure that individual input files are never split-up
+ so that {@link Mapper}s process entire files.
+
+ @param context the job context
+ @param filename the file name to check
+ @return is this file splitable?]]>
+ FileInputFormat
is the base class for all file-based
+ InputFormat
s. This provides a generic implementation of
+ {@link #getSplits(JobContext)}.
+ Subclasses of FileInputFormat
can also override the
+ {@link #isSplitable(JobContext, Path)} method to ensure input-files are
+ not split-up and are processed as a whole by {@link Mapper}s.]]>
+ ) }]]> +
+ Mapper implementations using this MapRunnable must be thread-safe. +
+ The Map-Reduce job has to be configured with the mapper to use via + {@link #setMapperClass(Job, Class)} and + the number of thread the thread-pool can use with the + {@link #getNumberOfThreads(JobContext)} method. The default + value is 10 threads. +
]]> +
Mapper.Context
for custom implementations]]>
+ false
otherwise]]>
+ Some applications need to create/write-to side-files, which differ from + the actual job-outputs. + +
In such cases there could be issues with 2 instances of the same TIP + (running simultaneously e.g. speculative tasks) trying to open/write-to the + same file (path) on HDFS. Hence the application-writer will have to pick + unique names per task-attempt (e.g. using the attemptid, say + attempt_200709221812_0001_m_000000_0), not just per TIP.
+ +To get around this the Map-Reduce framework helps the application-writer + out by maintaining a special + ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} + sub-directory for each task-attempt on HDFS where the output of the + task-attempt goes. On successful completion of the task-attempt the files + in the ${mapreduce.output.fileoutputformat.outputdir}/_temporary/_${taskid} (only) + are promoted to ${mapreduce.output.fileoutputformat.outputdir}. Of course, the + framework discards the sub-directory of unsuccessful task-attempts. This + is completely transparent to the application.
+ +The application-writer can take advantage of this by creating any + side-files required in a work directory during execution + of his task i.e. via + {@link #getWorkOutputPath(TaskInputOutputContext)}, and + the framework will move them out similarly - thus she doesn't have to pick + unique paths per task-attempt.
+ +The entire discussion holds true for maps of jobs with + reducer=NONE (i.e. 0 reduces) since output of the map, in that case, + goes directly to HDFS.
+ + @return the {@link Path} to the task's temporary output directory + for the map-reduce job.]]> +This method uses the {@link #getUniqueFile} method to make the file name + unique for the task.
+ + @param context the context for the task. + @param name the name for the file. + @param extension the extension for the file + @return a unique path accross all tasks of the job.]]> +close()
]]>
+ OutputFormat
, with its own key class and with its own value
+ class.
+
+
+ + Case two: to write data to different files provided by user +
+ ++ MultipleOutputs supports counters, by default they are disabled. The + counters group is the {@link MultipleOutputs} class name. The names of the + counters are the same as the output name. These count the number records + written to each output name. +
+ + Usage pattern for job submission: ++ + Job job = new Job(); + + FileInputFormat.setInputPath(job, inDir); + FileOutputFormat.setOutputPath(job, outDir); + + job.setMapperClass(MOMap.class); + job.setReducerClass(MOReduce.class); + ... + + // Defines additional single text based output 'text' for the job + MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, + LongWritable.class, Text.class); + + // Defines additional sequence-file based output 'sequence' for the job + MultipleOutputs.addNamedOutput(job, "seq", + SequenceFileOutputFormat.class, + LongWritable.class, Text.class); + ... + + job.waitForCompletion(true); + ... ++
+ Usage in Reducer: +
+ <K, V> String generateFileName(K k, V v) { + return k.toString() + "_" + v.toString(); + } + + public class MOReduce extends + Reducer<WritableComparable, Writable,WritableComparable, Writable> { + private MultipleOutputs mos; + public void setup(Context context) { + ... + mos = new MultipleOutputs(context); + } + + public void reduce(WritableComparable key, Iterator<Writable> values, + Context context) + throws IOException { + ... + mos.write("text", , key, new Text("Hello")); + mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a"); + mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b"); + mos.write(key, new Text("value"), generateFileName(key, new Text("value"))); + ... + } + + public void cleanup(Context) throws IOException { + mos.close(); + ... + } + + } ++ +
+ When used in conjuction with org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat, + MultipleOutputs can mimic the behaviour of MultipleTextOutputFormat and MultipleSequenceFileOutputFormat + from the old Hadoop API - ie, output can be written from the Reducer to more than one location. +
+ +
+ Use MultipleOutputs.write(KEYOUT key, VALUEOUT value, String baseOutputPath)
to write key and
+ value to a path specified by baseOutputPath
, with no need to specify a named output:
+
+ private MultipleOutputs<Text, Text> out; + + public void setup(Context context) { + out = new MultipleOutputs<Text, Text>(context); + ... + } + + public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { + for (Text t : values) { + out.write(key, t, generateFileName(<parameter list...>)); + } + } + + protected void cleanup(Context context) throws IOException, InterruptedException { + out.close(); + } ++ +
+ Use your own code in generateFileName()
to create a custom path to your results.
+ '/' characters in baseOutputPath
will be translated into directory levels in your file system.
+ Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc.
+ No call to context.write()
is necessary. See example generateFileName()
code below.
+
+ private String generateFileName(Text k) { + // expect Text k in format "Surname|Forename" + String[] kStr = k.toString().split("\\|"); + + String sName = kStr[0]; + String fName = kStr[1]; + + // example for k = Smith|John + // output written to /user/hadoop/path/to/output/Smith/John-r-00000 (etc) + return sName + "/" + fName; + } ++ +
+ Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000.
+ To prevent this use LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
+ instead of job.setOutputFormatClass(TextOutputFormat.class);
in your Hadoop job configuration.
+
The subarray to be used for the partitioning can be defined by means + of the following properties: +
+ +---+---+---+---+---+
+ | B | B | B | B | B |
+ +---+---+---+---+---+
+ 0 1 2 3 4
+ -5 -4 -3 -2 -1
+
+ The first row of numbers gives the position of the offsets 0...5 in
+ the array; the second row gives the corresponding negative offsets.
+ Contrary to Python, the specified subarray has byte i
+ and j
as first and last element, repectively, when
+ i
and j
are the left and right offset.
+
+ For Hadoop programs written in Java, it is advisable to use one of + the following static convenience methods for setting the offsets: +
Reducer.Context
for custom implementations]]>
+