From 381a4c42135916245c8992daa3d03f38e282108d Mon Sep 17 00:00:00 2001 From: Siddharth Seth Date: Thu, 1 Aug 2013 17:42:24 +0000 Subject: [PATCH] MAPREDUCE-5352. Optimize node local splits generated by CombineFileInputFormat. (sseth) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1509345 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../lib/input/CombineFileInputFormat.java | 192 +++++++++++------- .../lib/input/TestCombineFileInputFormat.java | 89 +++++++- 3 files changed, 202 insertions(+), 82 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index a174eb366c..d0f99ff6b4 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -195,6 +195,9 @@ Release 2.1.1-beta - UNRELEASED OPTIMIZATIONS + MAPREDUCE-5352. Optimize node local splits generated by + CombineFileInputFormat. (sseth) + BUG FIXES MAPREDUCE-5385. Fixed a bug with JobContext getCacheFiles API. (Omkar Vinit diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java index 1d3d64d18b..5556eee7c8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java @@ -21,13 +21,18 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashSet; import java.util.HashSet; import java.util.List; import java.util.HashMap; import java.util.Set; import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -49,6 +54,8 @@ import org.apache.hadoop.net.NetworkTopology; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; /** * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in @@ -78,6 +85,8 @@ public abstract class CombineFileInputFormat extends FileInputFormat { + private static final Log LOG = LogFactory.getLog(CombineFileInputFormat.class); + public static final String SPLIT_MINSIZE_PERNODE = "mapreduce.input.fileinputformat.split.minsize.per.node"; public static final String SPLIT_MINSIZE_PERRACK = @@ -185,6 +194,8 @@ public List getSplits(JobContext job) maxSize = maxSplitSize; } else { maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0); + // If maxSize is not configured, a single split will be generated per + // node. } if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) { throw new IOException("Minimum split size pernode " + minSizeNode + @@ -257,8 +268,8 @@ private void getMoreSplits(JobContext job, List stats, new HashMap(); // mapping from a node to the list of blocks that it contains - HashMap> nodeToBlocks = - new HashMap>(); + HashMap> nodeToBlocks = + new HashMap>(); files = new OneFileInfo[stats.size()]; if (stats.size() == 0) { @@ -279,9 +290,9 @@ private void getMoreSplits(JobContext job, List stats, } @VisibleForTesting - void createSplits(HashMap> nodeToBlocks, - HashMap blockToNodes, - HashMap> rackToBlocks, + void createSplits(Map> nodeToBlocks, + Map blockToNodes, + Map> rackToBlocks, long totLength, long maxSize, long minSizeNode, @@ -289,83 +300,118 @@ void createSplits(HashMap> nodeToBlocks, List splits ) { ArrayList validBlocks = new ArrayList(); - Set nodes = new HashSet(); long curSplitSize = 0; - int numNodes = nodeToBlocks.size(); + int totalNodes = nodeToBlocks.size(); long totalLength = totLength; + Multiset splitsPerNode = HashMultiset.create(); + Set completedNodes = new HashSet(); + while(true) { // it is allowed for maxSize to be 0. Disable smoothing load for such cases - int avgSplitsPerNode = maxSize > 0 && numNodes > 0 ? - ((int) (totalLength/maxSize))/numNodes - : Integer.MAX_VALUE; - int maxSplitsByNodeOnly = (avgSplitsPerNode > 0) ? avgSplitsPerNode : 1; - numNodes = 0; - // process all nodes and create splits that are local to a node. - for (Iterator>> iter = nodeToBlocks + // process all nodes and create splits that are local to a node. Generate + // one split per node iteration, and walk over nodes multiple times to + // distribute the splits across nodes. + for (Iterator>> iter = nodeToBlocks .entrySet().iterator(); iter.hasNext();) { - Map.Entry> one = iter.next(); - nodes.add(one.getKey()); - List blocksInNode = one.getValue(); + Map.Entry> one = iter.next(); + + String node = one.getKey(); + + // Skip the node if it has previously been marked as completed. + if (completedNodes.contains(node)) { + continue; + } + + Set blocksInCurrentNode = one.getValue(); // for each block, copy it into validBlocks. Delete it from // blockToNodes so that the same block does not appear in // two different splits. - int splitsInNode = 0; - for (OneBlockInfo oneblock : blocksInNode) { - if (blockToNodes.containsKey(oneblock)) { - validBlocks.add(oneblock); - blockToNodes.remove(oneblock); - curSplitSize += oneblock.length; + Iterator oneBlockIter = blocksInCurrentNode.iterator(); + while (oneBlockIter.hasNext()) { + OneBlockInfo oneblock = oneBlockIter.next(); + + // Remove all blocks which may already have been assigned to other + // splits. + if(!blockToNodes.containsKey(oneblock)) { + oneBlockIter.remove(); + continue; + } + + validBlocks.add(oneblock); + blockToNodes.remove(oneblock); + curSplitSize += oneblock.length; - // if the accumulated split size exceeds the maximum, then - // create this split. - if (maxSize != 0 && curSplitSize >= maxSize) { - // create an input split and add it to the splits array - addCreatedSplit(splits, nodes, validBlocks); - totalLength -= curSplitSize; - curSplitSize = 0; - validBlocks.clear(); - splitsInNode++; - if (splitsInNode == maxSplitsByNodeOnly) { - // stop grouping on a node so as not to create - // disproportionately more splits on a node because it happens - // to have many blocks - // consider only these nodes in next round of grouping because - // they have leftover blocks that may need to be grouped - numNodes++; - break; - } + // if the accumulated split size exceeds the maximum, then + // create this split. + if (maxSize != 0 && curSplitSize >= maxSize) { + // create an input split and add it to the splits array + addCreatedSplit(splits, Collections.singleton(node), validBlocks); + totalLength -= curSplitSize; + curSplitSize = 0; + + splitsPerNode.add(node); + + // Remove entries from blocksInNode so that we don't walk these + // again. + blocksInCurrentNode.removeAll(validBlocks); + validBlocks.clear(); + + // Done creating a single split for this node. Move on to the next + // node so that splits are distributed across nodes. + break; + } + + } + if (validBlocks.size() != 0) { + // This implies that the last few blocks (or all in case maxSize=0) + // were not part of a split. The node is complete. + + // if there were any blocks left over and their combined size is + // larger than minSplitNode, then combine them into one split. + // Otherwise add them back to the unprocessed pool. It is likely + // that they will be combined with other blocks from the + // same rack later on. + // This condition also kicks in when max split size is not set. All + // blocks on a node will be grouped together into a single split. + if (minSizeNode != 0 && curSplitSize >= minSizeNode + && splitsPerNode.count(node) == 0) { + // haven't created any split on this machine. so its ok to add a + // smaller one for parallelism. Otherwise group it in the rack for + // balanced size create an input split and add it to the splits + // array + addCreatedSplit(splits, Collections.singleton(node), validBlocks); + totalLength -= curSplitSize; + splitsPerNode.add(node); + // Remove entries from blocksInNode so that we don't walk this again. + blocksInCurrentNode.removeAll(validBlocks); + // The node is done. This was the last set of blocks for this node. + } else { + // Put the unplaced blocks back into the pool for later rack-allocation. + for (OneBlockInfo oneblock : validBlocks) { + blockToNodes.put(oneblock, oneblock.hosts); } } + validBlocks.clear(); + curSplitSize = 0; + completedNodes.add(node); + } else { // No in-flight blocks. + if (blocksInCurrentNode.size() == 0) { + // Node is done. All blocks were fit into node-local splits. + completedNodes.add(node); + } // else Run through the node again. } - // if there were any blocks left over and their combined size is - // larger than minSplitNode, then combine them into one split. - // Otherwise add them back to the unprocessed pool. It is likely - // that they will be combined with other blocks from the - // same rack later on. - if (minSizeNode != 0 && curSplitSize >= minSizeNode - && splitsInNode == 0) { - // haven't created any split on this machine. so its ok to add a - // smaller - // one for parallelism. Otherwise group it in the rack for balanced - // size - // create an input split and add it to the splits array - addCreatedSplit(splits, nodes, validBlocks); - totalLength -= curSplitSize; - } else { - for (OneBlockInfo oneblock : validBlocks) { - blockToNodes.put(oneblock, oneblock.hosts); - } - } - validBlocks.clear(); - nodes.clear(); - curSplitSize = 0; } - - if(!(numNodes>0 && totalLength>0)) { + + // Check if node-local assignments are complete. + if (completedNodes.size() == totalNodes || totalLength == 0) { + // All nodes have been walked over and marked as completed or all blocks + // have been assigned. The rest should be handled via rackLock assignment. + LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: " + + completedNodes.size() + ", size left: " + totalLength); break; } } @@ -514,7 +560,7 @@ static class OneFileInfo { boolean isSplitable, HashMap> rackToBlocks, HashMap blockToNodes, - HashMap> nodeToBlocks, + HashMap> nodeToBlocks, HashMap> rackToNodes, long maxSize) throws IOException { @@ -588,10 +634,10 @@ static class OneFileInfo { @VisibleForTesting static void populateBlockInfo(OneBlockInfo[] blocks, - HashMap> rackToBlocks, - HashMap blockToNodes, - HashMap> nodeToBlocks, - HashMap> rackToNodes) { + Map> rackToBlocks, + Map blockToNodes, + Map> nodeToBlocks, + Map> rackToNodes) { for (OneBlockInfo oneblock : blocks) { // add this block to the block --> node locations map blockToNodes.put(oneblock, oneblock.hosts); @@ -623,9 +669,9 @@ static void populateBlockInfo(OneBlockInfo[] blocks, // add this block to the node --> block map for (int j = 0; j < oneblock.hosts.length; j++) { String node = oneblock.hosts[j]; - List blklist = nodeToBlocks.get(node); + Set blklist = nodeToBlocks.get(node); if (blklist == null) { - blklist = new ArrayList(); + blklist = new LinkedHashSet(); nodeToBlocks.put(node, blklist); } blklist.add(oneblock); @@ -689,7 +735,7 @@ protected BlockLocation[] getFileBlockLocations( return fs.getFileBlockLocations(stat, 0, stat.getLen()); } - private static void addHostToRack(HashMap> rackToNodes, + private static void addHostToRack(Map> rackToNodes, String rack, String host) { Set hosts = rackToNodes.get(rack); if (hosts == null) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java index 07ff2922d0..5730297b7e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java @@ -20,23 +20,31 @@ import java.io.IOException; import java.io.OutputStream; import java.net.URI; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.ArrayList; +import java.util.Map; import java.util.Set; -import java.util.zip.GZIPOutputStream; +import java.util.TreeMap; import java.util.concurrent.TimeoutException; +import java.util.zip.GZIPOutputStream; import junit.framework.Assert; import junit.framework.TestCase; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.HdfsBlockLocation; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -715,6 +723,69 @@ private static void writeDataAndSetReplication(FileSystem fileSys, Path name, out.close(); DFSTestUtil.waitReplication(fileSys, name, replication); } + + public void testNodeDistribution() throws IOException, InterruptedException { + DummyInputFormat inFormat = new DummyInputFormat(); + int numBlocks = 60; + long totLength = 0; + long blockSize = 100; + int numNodes = 10; + + long minSizeNode = 50; + long minSizeRack = 50; + int maxSplitSize = 200; // 4 blocks per split. + + String[] locations = new String[numNodes]; + for (int i = 0; i < numNodes; i++) { + locations[i] = "h" + i; + } + String[] racks = new String[0]; + Path path = new Path("hdfs://file"); + + OneBlockInfo[] blocks = new OneBlockInfo[numBlocks]; + + int hostCountBase = 0; + // Generate block list. Replication 3 per block. + for (int i = 0; i < numBlocks; i++) { + int localHostCount = hostCountBase; + String[] blockHosts = new String[3]; + for (int j = 0; j < 3; j++) { + int hostNum = localHostCount % numNodes; + blockHosts[j] = "h" + hostNum; + localHostCount++; + } + hostCountBase++; + blocks[i] = new OneBlockInfo(path, i * blockSize, blockSize, blockHosts, + racks); + totLength += blockSize; + } + + List splits = new ArrayList(); + HashMap> rackToNodes = new HashMap>(); + HashMap> rackToBlocks = new HashMap>(); + HashMap blockToNodes = new HashMap(); + Map> nodeToBlocks = new TreeMap>(); + + OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes, + nodeToBlocks, rackToNodes); + + inFormat.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, + maxSplitSize, minSizeNode, minSizeRack, splits); + + int expectedSplitCount = (int) (totLength / maxSplitSize); + Assert.assertEquals(expectedSplitCount, splits.size()); + + // Ensure 90+% of the splits have node local blocks. + // 100% locality may not always be achieved. + int numLocalSplits = 0; + for (InputSplit inputSplit : splits) { + Assert.assertEquals(maxSplitSize, inputSplit.getLength()); + if (inputSplit.getLocations().length == 1) { + numLocalSplits++; + } + } + Assert.assertTrue(numLocalSplits >= 0.9 * splits.size()); + } public void testNodeInputSplit() throws IOException, InterruptedException { // Regression test for MAPREDUCE-4892. There are 2 nodes with all blocks on @@ -744,8 +815,8 @@ public void testNodeInputSplit() throws IOException, InterruptedException { new HashMap>(); HashMap blockToNodes = new HashMap(); - HashMap> nodeToBlocks = - new HashMap>(); + HashMap> nodeToBlocks = + new HashMap>(); OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);