From ae353ea96993ec664090c5d84f6675c29d9f0f5f Mon Sep 17 00:00:00 2001 From: Jian He Date: Tue, 24 May 2016 19:47:50 -0700 Subject: [PATCH] MAPREDUCE-6703. Add flag to allow MapReduce AM to request for OPPORTUNISTIC containers. Contributed by Arun Suresh --- .../v2/app/rm/RMContainerAllocator.java | 176 ++++++----- .../v2/app/rm/RMContainerRequestor.java | 17 +- .../apache/hadoop/mapreduce/MRJobConfig.java | 11 + .../mapred/TestMROpportunisticMaps.java | 281 ++++++++++++++++++ 4 files changed, 416 insertions(+), 69 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index d2d1b4ecec..94b5c169da 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -104,6 +104,7 @@ public class RMContainerAllocator extends RMContainerRequestor static final Priority PRIORITY_FAST_FAIL_MAP; static final Priority PRIORITY_REDUCE; static final Priority PRIORITY_MAP; + static final Priority PRIORITY_OPPORTUNISTIC_MAP; @VisibleForTesting public static final String RAMPDOWN_DIAGNOSTIC = "Reducer preempted " @@ -119,6 +120,10 @@ public class RMContainerAllocator extends RMContainerRequestor PRIORITY_REDUCE.setPriority(10); PRIORITY_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class); PRIORITY_MAP.setPriority(20); + PRIORITY_OPPORTUNISTIC_MAP = + RecordFactoryProvider.getRecordFactory(null).newRecordInstance( + Priority.class); + PRIORITY_OPPORTUNISTIC_MAP.setPriority(19); } /* @@ -226,6 +231,9 @@ protected void serviceInit(Configuration conf) throws Exception { // Init startTime to current time. If all goes well, it will be reset after // first attempt to contact RM. retrystartTime = System.currentTimeMillis(); + this.scheduledRequests.setNumOpportunisticMapsPer100( + conf.getInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PER_100, + MRJobConfig.DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PER_100)); } @Override @@ -852,6 +860,8 @@ private void applyConcurrentTaskLimits() { setRequestLimit(PRIORITY_FAST_FAIL_MAP, mapResourceRequest, failedMapRequestLimit); setRequestLimit(PRIORITY_MAP, mapResourceRequest, normalMapRequestLimit); + setRequestLimit(PRIORITY_OPPORTUNISTIC_MAP, mapResourceRequest, + normalMapRequestLimit); } int numScheduledReduces = scheduledRequests.reduces.size(); @@ -979,6 +989,12 @@ class ScheduledRequests { @VisibleForTesting final Map maps = new LinkedHashMap(); + int mapsMod100 = 0; + int numOpportunisticMapsPer100 = 0; + + void setNumOpportunisticMapsPer100(int numMaps) { + this.numOpportunisticMapsPer100 = numMaps; + } @VisibleForTesting final LinkedHashMap reduces = @@ -1020,34 +1036,47 @@ void addMap(ContainerRequestEvent event) { new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP, mapNodeLabelExpression); LOG.info("Added "+event.getAttemptID()+" to list of failed maps"); + // If its an earlier Failed attempt, do not retry as OPPORTUNISTIC + maps.put(event.getAttemptID(), request); + addContainerReq(request); } else { - for (String host : event.getHosts()) { - LinkedList list = mapsHostMapping.get(host); - if (list == null) { - list = new LinkedList(); - mapsHostMapping.put(host, list); + if (mapsMod100 < numOpportunisticMapsPer100) { + request = + new ContainerRequest(event, PRIORITY_OPPORTUNISTIC_MAP, + mapNodeLabelExpression); + maps.put(event.getAttemptID(), request); + addOpportunisticResourceRequest(request.priority, request.capability); + } else { + request = + new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression); + for (String host : event.getHosts()) { + LinkedList list = mapsHostMapping.get(host); + if (list == null) { + list = new LinkedList(); + mapsHostMapping.put(host, list); + } + list.add(event.getAttemptID()); + if (LOG.isDebugEnabled()) { + LOG.debug("Added attempt req to host " + host); + } } - list.add(event.getAttemptID()); - if (LOG.isDebugEnabled()) { - LOG.debug("Added attempt req to host " + host); + for (String rack : event.getRacks()) { + LinkedList list = mapsRackMapping.get(rack); + if (list == null) { + list = new LinkedList(); + mapsRackMapping.put(rack, list); + } + list.add(event.getAttemptID()); + if (LOG.isDebugEnabled()) { + LOG.debug("Added attempt req to rack " + rack); + } } - } - for (String rack: event.getRacks()) { - LinkedList list = mapsRackMapping.get(rack); - if (list == null) { - list = new LinkedList(); - mapsRackMapping.put(rack, list); - } - list.add(event.getAttemptID()); - if (LOG.isDebugEnabled()) { - LOG.debug("Added attempt req to rack " + rack); - } - } - request = - new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression); + maps.put(event.getAttemptID(), request); + addContainerReq(request); + } + mapsMod100++; + mapsMod100 %= 100; } - maps.put(event.getAttemptID(), request); - addContainerReq(request); } @@ -1077,7 +1106,8 @@ private void assign(List allocatedContainers) { Priority priority = allocated.getPriority(); Resource allocatedResource = allocated.getResource(); if (PRIORITY_FAST_FAIL_MAP.equals(priority) - || PRIORITY_MAP.equals(priority)) { + || PRIORITY_MAP.equals(priority) + || PRIORITY_OPPORTUNISTIC_MAP.equals(priority)) { if (ResourceCalculatorUtils.computeAvailableContainers(allocatedResource, mapResourceRequest, getSchedulerResourceTypes()) <= 0 || maps.isEmpty()) { @@ -1235,7 +1265,8 @@ private ContainerRequest getContainerReqToReplace(Container allocated) { LOG.info("Found replacement: " + toBeReplaced); return toBeReplaced; } - else if (PRIORITY_MAP.equals(priority)) { + else if (PRIORITY_MAP.equals(priority) + || PRIORITY_OPPORTUNISTIC_MAP.equals(priority)) { LOG.info("Replacing MAP container " + allocated.getId()); // allocated container was for a map String host = allocated.getNodeId().getHost(); @@ -1298,29 +1329,33 @@ private void assignMapsWithLocality(List allocatedContainers) { while(it.hasNext() && maps.size() > 0 && canAssignMaps()){ Container allocated = it.next(); Priority priority = allocated.getPriority(); - assert PRIORITY_MAP.equals(priority); - // "if (maps.containsKey(tId))" below should be almost always true. - // hence this while loop would almost always have O(1) complexity - String host = allocated.getNodeId().getHost(); - LinkedList list = mapsHostMapping.get(host); - while (list != null && list.size() > 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Host matched to the request list " + host); - } - TaskAttemptId tId = list.removeFirst(); - if (maps.containsKey(tId)) { - ContainerRequest assigned = maps.remove(tId); - containerAssigned(allocated, assigned); - it.remove(); - JobCounterUpdateEvent jce = - new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); - jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1); - eventHandler.handle(jce); - hostLocalAssigned++; + assert (PRIORITY_MAP.equals(priority) + || PRIORITY_OPPORTUNISTIC_MAP.equals(priority)); + if (!PRIORITY_OPPORTUNISTIC_MAP.equals(priority)) { + // "if (maps.containsKey(tId))" below should be almost always true. + // hence this while loop would almost always have O(1) complexity + String host = allocated.getNodeId().getHost(); + LinkedList list = mapsHostMapping.get(host); + while (list != null && list.size() > 0) { if (LOG.isDebugEnabled()) { - LOG.debug("Assigned based on host match " + host); + LOG.debug("Host matched to the request list " + host); + } + TaskAttemptId tId = list.removeFirst(); + if (maps.containsKey(tId)) { + ContainerRequest assigned = maps.remove(tId); + containerAssigned(allocated, assigned); + it.remove(); + JobCounterUpdateEvent jce = + new JobCounterUpdateEvent(assigned.attemptID.getTaskId() + .getJobId()); + jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1); + eventHandler.handle(jce); + hostLocalAssigned++; + if (LOG.isDebugEnabled()) { + LOG.debug("Assigned based on host match " + host); + } + break; } - break; } } } @@ -1330,27 +1365,31 @@ private void assignMapsWithLocality(List allocatedContainers) { while(it.hasNext() && maps.size() > 0 && canAssignMaps()){ Container allocated = it.next(); Priority priority = allocated.getPriority(); - assert PRIORITY_MAP.equals(priority); - // "if (maps.containsKey(tId))" below should be almost always true. - // hence this while loop would almost always have O(1) complexity - String host = allocated.getNodeId().getHost(); - String rack = RackResolver.resolve(host).getNetworkLocation(); - LinkedList list = mapsRackMapping.get(rack); - while (list != null && list.size() > 0) { - TaskAttemptId tId = list.removeFirst(); - if (maps.containsKey(tId)) { - ContainerRequest assigned = maps.remove(tId); - containerAssigned(allocated, assigned); - it.remove(); - JobCounterUpdateEvent jce = - new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId()); - jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1); - eventHandler.handle(jce); - rackLocalAssigned++; - if (LOG.isDebugEnabled()) { - LOG.debug("Assigned based on rack match " + rack); + assert (PRIORITY_MAP.equals(priority) + || PRIORITY_OPPORTUNISTIC_MAP.equals(priority)); + if (!PRIORITY_OPPORTUNISTIC_MAP.equals(priority)) { + // "if (maps.containsKey(tId))" below should be almost always true. + // hence this while loop would almost always have O(1) complexity + String host = allocated.getNodeId().getHost(); + String rack = RackResolver.resolve(host).getNetworkLocation(); + LinkedList list = mapsRackMapping.get(rack); + while (list != null && list.size() > 0) { + TaskAttemptId tId = list.removeFirst(); + if (maps.containsKey(tId)) { + ContainerRequest assigned = maps.remove(tId); + containerAssigned(allocated, assigned); + it.remove(); + JobCounterUpdateEvent jce = + new JobCounterUpdateEvent(assigned.attemptID.getTaskId() + .getJobId()); + jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1); + eventHandler.handle(jce); + rackLocalAssigned++; + if (LOG.isDebugEnabled()) { + LOG.debug("Assigned based on rack match " + rack); + } + break; } - break; } } } @@ -1360,7 +1399,8 @@ private void assignMapsWithLocality(List allocatedContainers) { while(it.hasNext() && maps.size() > 0 && canAssignMaps()){ Container allocated = it.next(); Priority priority = allocated.getPriority(); - assert PRIORITY_MAP.equals(priority); + assert (PRIORITY_MAP.equals(priority) + || PRIORITY_OPPORTUNISTIC_MAP.equals(priority)); TaskAttemptId tId = maps.keySet().iterator().next(); ContainerRequest assigned = maps.remove(tId); containerAssigned(allocated, assigned); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 0e87f29017..7030712d58 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; @@ -389,7 +390,7 @@ protected void containerFailedOnHost(String hostName) { protected Resource getAvailableResources() { return availableResources == null ? Resources.none() : availableResources; } - + protected void addContainerReq(ContainerRequest req) { // Create resource requests for (String host : req.hosts) { @@ -424,8 +425,21 @@ protected void decContainerReq(ContainerRequest req) { decResourceRequest(req.priority, ResourceRequest.ANY, req.capability); } + protected void addOpportunisticResourceRequest(Priority priority, + Resource capability) { + addResourceRequest(priority, ResourceRequest.ANY, capability, null, + ExecutionType.OPPORTUNISTIC); + } + private void addResourceRequest(Priority priority, String resourceName, Resource capability, String nodeLabelExpression) { + addResourceRequest(priority, resourceName, capability, nodeLabelExpression, + ExecutionType.GUARANTEED); + } + + private void addResourceRequest(Priority priority, String resourceName, + Resource capability, String nodeLabelExpression, + ExecutionType executionType) { Map> remoteRequests = this.remoteRequestsTable.get(priority); if (remoteRequests == null) { @@ -448,6 +462,7 @@ private void addResourceRequest(Priority priority, String resourceName, remoteRequest.setCapability(capability); remoteRequest.setNumContainers(0); remoteRequest.setNodeLabelExpression(nodeLabelExpression); + remoteRequest.setExecutionType(executionType); reqMap.put(capability, remoteRequest); } remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index f75a3b91b4..be47bfcda8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -966,4 +966,15 @@ public interface MRJobConfig { public static final int DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB = 128; + /** + * Number of OPPORTUNISTIC Containers per 100 containers that will be + * requested by the MRAppMaster. The Default value is 0, which implies all + * maps will be guaranteed. A value of 100 means all maps will be requested + * as opportunistic. For any other value say 'x', the FIRST 'x' maps + * requested by the AM will be opportunistic. If the total number of maps + * for the job is less than 'x', then ALL maps will be OPPORTUNISTIC + */ + public static final String MR_NUM_OPPORTUNISTIC_MAPS_PER_100 = + "mapreduce.job.num-opportunistic-maps-per-100"; + public static final int DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PER_100 = 0; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java new file mode 100644 index 0000000000..dfe85f27c0 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMROpportunisticMaps.java @@ -0,0 +1,281 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; + +import static org.junit.Assert.*; + +/** + * Simple MapReduce to test ability of the MRAppMaster to request and use + * OPPORTUNISTIC containers. + * This test runs a simple external merge sort using MapReduce. + * The Hadoop framework's merge on the reduce side will merge the partitions + * created to generate the final output which is sorted on the key. + */ +@SuppressWarnings(value={"unchecked", "deprecation"}) +public class TestMROpportunisticMaps { + // Where MR job's input will reside. + private static final Path INPUT_DIR = new Path("/test/input"); + // Where output goes. + private static final Path OUTPUT = new Path("/test/output"); + + /** + * Test will run with 4 Maps, All OPPORTUNISTIC. + * @throws Exception + */ + @Test + public void testAllOpportunisticMaps() throws Exception { + doTest(4, 1, 1, 4); + } + + /** + * Test will run with 4 Maps, 2 OPPORTUNISTIC and 2 GUARANTEED. + * @throws Exception + */ + @Test + public void testHalfOpportunisticMaps() throws Exception { + doTest(4, 1, 1, 2); + } + + /** + * Test will run with 6 Maps and 2 Reducers. All the Maps are OPPORTUNISTIC. + * @throws Exception + */ + @Test + public void testMultipleReducers() throws Exception { + doTest(6, 2, 1, 6); + } + + public void doTest(int numMappers, int numReducers, int numNodes, + int percent) throws Exception { + doTest(numMappers, numReducers, numNodes, 1000, percent); + } + + public void doTest(int numMappers, int numReducers, int numNodes, + int numLines, int percent) throws Exception { + MiniDFSCluster dfsCluster = null; + MiniMRClientCluster mrCluster = null; + FileSystem fileSystem = null; + try { + Configuration conf = new Configuration(); + // Start the mini-MR and mini-DFS clusters + conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); + conf.setBoolean(YarnConfiguration.DIST_SCHEDULING_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED, true); + dfsCluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numNodes).build(); + fileSystem = dfsCluster.getFileSystem(); + mrCluster = MiniMRClientClusterFactory.create(this.getClass(), + numNodes, conf); + // Generate input. + createInput(fileSystem, numMappers, numLines); + // Run the test. + + Configuration jobConf = mrCluster.getConfig(); + jobConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, + YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS); + + runMergeTest(new JobConf(jobConf), fileSystem, + numMappers, numReducers, numLines, percent); + } finally { + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + if (mrCluster != null) { + mrCluster.stop(); + } + } + } + + private void createInput(FileSystem fs, int numMappers, int numLines) + throws Exception { + fs.delete(INPUT_DIR, true); + for (int i = 0; i < numMappers; i++) { + OutputStream os = fs.create(new Path(INPUT_DIR, "input_" + i + ".txt")); + Writer writer = new OutputStreamWriter(os); + for (int j = 0; j < numLines; j++) { + // Create sorted key, value pairs. + int k = j + 1; + String formattedNumber = String.format("%09d", k); + writer.write(formattedNumber + " " + formattedNumber + "\n"); + } + writer.close(); + } + } + + private void runMergeTest(JobConf job, FileSystem fileSystem, int + numMappers, int numReducers, int numLines, int percent) + throws Exception { + fileSystem.delete(OUTPUT, true); + job.setJobName("Test"); + JobClient client = new JobClient(job); + RunningJob submittedJob = null; + FileInputFormat.setInputPaths(job, INPUT_DIR); + FileOutputFormat.setOutputPath(job, OUTPUT); + job.set("mapreduce.output.textoutputformat.separator", " "); + job.setInputFormat(TextInputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + job.setMapperClass(MyMapper.class); + job.setPartitionerClass(MyPartitioner.class); + job.setOutputFormat(TextOutputFormat.class); + job.setNumReduceTasks(numReducers); + + // All OPPORTUNISTIC + job.setInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PER_100, percent); + job.setInt("mapreduce.map.maxattempts", 1); + job.setInt("mapreduce.reduce.maxattempts", 1); + job.setInt("mapred.test.num_lines", numLines); + job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); + try { + submittedJob = client.submitJob(job); + try { + if (!client.monitorAndPrintJob(job, submittedJob)) { + throw new IOException("Job failed!"); + } + } catch(InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } catch(IOException ioe) { + System.err.println("Job failed with: " + ioe); + } finally { + verifyOutput(fileSystem, numMappers, numLines); + } + } + + private void verifyOutput(FileSystem fileSystem, int numMappers, int numLines) + throws Exception { + FSDataInputStream dis = null; + long numValidRecords = 0; + long numInvalidRecords = 0; + String prevKeyValue = "000000000"; + Path[] fileList = + FileUtil.stat2Paths(fileSystem.listStatus(OUTPUT, + new Utils.OutputFileUtils.OutputFilesFilter())); + for (Path outFile : fileList) { + try { + dis = fileSystem.open(outFile); + String record; + while((record = dis.readLine()) != null) { + // Split the line into key and value. + int blankPos = record.indexOf(" "); + String keyString = record.substring(0, blankPos); + String valueString = record.substring(blankPos+1); + // Check for sorted output and correctness of record. + if (keyString.compareTo(prevKeyValue) >= 0 + && keyString.equals(valueString)) { + prevKeyValue = keyString; + numValidRecords++; + } else { + numInvalidRecords++; + } + } + } finally { + if (dis != null) { + dis.close(); + dis = null; + } + } + } + // Make sure we got all input records in the output in sorted order. + assertEquals((long)(numMappers * numLines), numValidRecords); + // Make sure there is no extraneous invalid record. + assertEquals(0, numInvalidRecords); + } + + /** + * A mapper implementation that assumes that key text contains valid integers + * in displayable form. + */ + public static class MyMapper extends MapReduceBase + implements Mapper { + private Text keyText; + private Text valueText; + + public MyMapper() { + keyText = new Text(); + valueText = new Text(); + } + + @Override + public void map(LongWritable key, Text value, + OutputCollector output, + Reporter reporter) throws IOException { + String record = value.toString(); + int blankPos = record.indexOf(" "); + keyText.set(record.substring(0, blankPos)); + valueText.set(record.substring(blankPos+1)); + output.collect(keyText, valueText); + } + + public void close() throws IOException { + } + } + + /** + * Partitioner implementation to make sure that output is in total sorted + * order. We basically route key ranges to different reducers such that + * key values monotonically increase with the partition number. For example, + * in a test with 4 reducers, the keys are numbers from 1 to 1000 in the + * form "000000001" to "000001000" in each input file. The keys "000000001" + * to "000000250" are routed to partition 0, "000000251" to "000000500" are + * routed to partition 1. + */ + static class MyPartitioner implements Partitioner { + + private JobConf job; + + public MyPartitioner() { + } + + public void configure(JobConf jobConf) { + this.job = jobConf; + } + + public int getPartition(Text key, Text value, int numPartitions) { + int keyValue = 0; + try { + keyValue = Integer.parseInt(key.toString()); + } catch(NumberFormatException nfe) { + keyValue = 0; + } + int partitionNumber = (numPartitions * (Math.max(0, keyValue - 1))) / + job.getInt("mapred.test.num_lines", 10000); + return partitionNumber; + } + } + +} \ No newline at end of file