diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 481757ab50..002fbe6aba 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -368,6 +368,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-5649. Reduce cannot use more than 2G memory for the final merge (Gera Shegalov via jlowe) + MAPREDUCE-6165. [JDK8] TestCombineFileInputFormat failed on JDK8. + (Akira AJISAKA via ozawa) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java index 040c54be97..b2b7656063 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java @@ -29,7 +29,6 @@ import java.util.Set; import java.util.Iterator; import java.util.Map; -import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -289,6 +288,26 @@ private void getMoreSplits(JobContext job, List stats, maxSize, minSizeNode, minSizeRack, splits); } + /** + * Process all the nodes and create splits that are local to a node. + * Generate one split per node iteration, and walk over nodes multiple times + * to distribute the splits across nodes. + *

+ * Note: The order of processing the nodes is undetermined because the + * implementation of nodeToBlocks is {@link java.util.HashMap} and its order + * of the entries is undetermined. + * @param nodeToBlocks Mapping from a node to the list of blocks that + * it contains. + * @param blockToNodes Mapping from a block to the nodes on which + * it has replicas. + * @param rackToBlocks Mapping from a rack name to the list of blocks it has. + * @param totLength Total length of the input files. + * @param maxSize Max size of each split. + * If set to 0, disable smoothing load. + * @param minSizeNode Minimum split size per node. + * @param minSizeRack Minimum split size per rack. + * @param splits New splits created by this method are added to the list. + */ @VisibleForTesting void createSplits(Map> nodeToBlocks, Map blockToNodes, @@ -309,11 +328,6 @@ void createSplits(Map> nodeToBlocks, Set completedNodes = new HashSet(); while(true) { - // it is allowed for maxSize to be 0. Disable smoothing load for such cases - - // process all nodes and create splits that are local to a node. Generate - // one split per node iteration, and walk over nodes multiple times to - // distribute the splits across nodes. for (Iterator>> iter = nodeToBlocks .entrySet().iterator(); iter.hasNext();) { Map.Entry> one = iter.next(); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java index 85c675c308..b49f2d831a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java @@ -22,6 +22,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -53,13 +54,22 @@ import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneBlockInfo; import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneFileInfo; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; + +import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import com.google.common.collect.HashMultiset; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; public class TestCombineFileInputFormat { @@ -92,6 +102,14 @@ public class TestCombineFileInputFormat { static final int BLOCKSIZE = 1024; static final byte[] databuf = new byte[BLOCKSIZE]; + @Mock + private List mockList; + + @Before + public void initMocks() { + MockitoAnnotations.initMocks(this); + } + private static final String DUMMY_FS_URI = "dummyfs:///"; /** Dummy class to extend CombineFileInputFormat*/ @@ -299,7 +317,51 @@ public void testReinit() throws Exception { assertFalse(rr.nextKeyValue()); } + /** + * For testing each split has the expected name, length, and offset. + */ + private final class Split { + private String name; + private long length; + private long offset; + + public Split(String name, long length, long offset) { + this.name = name; + this.length = length; + this.offset = offset; + } + + public String getName() { + return name; + } + + public long getLength() { + return length; + } + + public long getOffset() { + return offset; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Split) { + Split split = ((Split) obj); + return split.name.equals(name) && split.length == length + && split.offset == offset; + } + return false; + } + } + + /** + * The test suppresses unchecked warnings in + * {@link org.mockito.Mockito#reset}. Although calling the method is + * a bad manner, we call the method instead of splitting the test + * (i.e. restarting MiniDFSCluster) to save time. + */ @Test + @SuppressWarnings("unchecked") public void testSplitPlacement() throws Exception { MiniDFSCluster dfs = null; FileSystem fileSys = null; @@ -326,10 +388,10 @@ public void testSplitPlacement() throws Exception { throw new IOException("Mkdirs failed to create " + inDir.toString()); } Path file1 = new Path(dir1 + "/file1"); - writeFile(conf, file1, (short)1, 1); + writeFile(conf, file1, (short) 1, 1); // create another file on the same datanode Path file5 = new Path(dir5 + "/file5"); - writeFile(conf, file5, (short)1, 1); + writeFile(conf, file5, (short) 1, 1); // split it using a CombinedFile input format DummyInputFormat inFormat = new DummyInputFormat(); Job job = Job.getInstance(conf); @@ -350,13 +412,13 @@ public void testSplitPlacement() throws Exception { assertEquals(0, fileSplit.getOffset(1)); assertEquals(BLOCKSIZE, fileSplit.getLength(1)); assertEquals(hosts1[0], fileSplit.getLocations()[0]); - + dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null); dfs.waitActive(); // create file on two datanodes. Path file2 = new Path(dir2 + "/file2"); - writeFile(conf, file2, (short)2, 2); + writeFile(conf, file2, (short) 2, 2); // split it using a CombinedFile input format inFormat = new DummyInputFormat(); @@ -365,34 +427,67 @@ public void testSplitPlacement() throws Exception { splits = inFormat.getSplits(job); System.out.println("Made splits(Test1): " + splits.size()); - // make sure that each split has different locations for (InputSplit split : splits) { System.out.println("File split(Test1): " + split); } - assertEquals(2, splits.size()); - fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(2, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file2.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(file2.getName(), fileSplit.getPath(1).getName()); - assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); - assertEquals(BLOCKSIZE, fileSplit.getLength(1)); - assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file1.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 + + for (InputSplit split : splits) { + fileSplit = (CombineFileSplit) split; + /** + * If rack1 is processed first by + * {@link CombineFileInputFormat#createSplits}, + * create only one split on rack1. Otherwise create two splits. + */ + if (splits.size() == 2) { + // first split is on rack2, contains file2 + if (split.equals(splits.get(0))) { + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file2.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file2.getName(), fileSplit.getPath(1).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); + } + // second split is on rack1, contains file1 + if (split.equals(splits.get(1))) { + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } + } else if (splits.size() == 1) { + // first split is on rack1, contains file1 and file2. + assertEquals(3, fileSplit.getNumPaths()); + Set expected = new HashSet<>(); + expected.add(new Split(file1.getName(), BLOCKSIZE, 0)); + expected.add(new Split(file2.getName(), BLOCKSIZE, 0)); + expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE)); + List actual = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + assertTrue(actual.containsAll(expected)); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } else { + fail("Expected split size is 1 or 2, but actual size is " + + splits.size()); + } + } // create another file on 3 datanodes and 3 racks. dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null); dfs.waitActive(); Path file3 = new Path(dir3 + "/file3"); - writeFile(conf, new Path(dir3 + "/file3"), (short)3, 3); + writeFile(conf, new Path(dir3 + "/file3"), (short) 3, 3); inFormat = new DummyInputFormat(); FileInputFormat.setInputPaths(job, dir1 + "," + dir2 + "," + dir3); inFormat.setMinSplitSizeRack(BLOCKSIZE); @@ -400,37 +495,98 @@ public void testSplitPlacement() throws Exception { for (InputSplit split : splits) { System.out.println("File split(Test2): " + split); } - assertEquals(3, splits.size()); - fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(3, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file3.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(file3.getName(), fileSplit.getPath(1).getName()); - assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); - assertEquals(BLOCKSIZE, fileSplit.getLength(1)); - assertEquals(file3.getName(), fileSplit.getPath(2).getName()); - assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2)); - assertEquals(BLOCKSIZE, fileSplit.getLength(2)); - assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3 - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(2, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file2.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(file2.getName(), fileSplit.getPath(1).getName()); - assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); - assertEquals(BLOCKSIZE, fileSplit.getLength(1)); - assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file1.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 + + Set expected = new HashSet<>(); + expected.add(new Split(file1.getName(), BLOCKSIZE, 0)); + expected.add(new Split(file2.getName(), BLOCKSIZE, 0)); + expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE)); + expected.add(new Split(file3.getName(), BLOCKSIZE, 0)); + expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE)); + expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE * 2)); + List actual = new ArrayList<>(); + + for (InputSplit split : splits) { + fileSplit = (CombineFileSplit) split; + /** + * If rack1 is processed first by + * {@link CombineFileInputFormat#createSplits}, + * create only one split on rack1. + * If rack2 or rack3 is processed first and rack1 is processed second, + * create one split on rack2 or rack3 and the other split is on rack1. + * Otherwise create 3 splits for each rack. + */ + if (splits.size() == 3) { + // first split is on rack3, contains file3 + if (split.equals(splits.get(0))) { + assertEquals(3, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file3.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file3.getName(), fileSplit.getPath(1).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals(file3.getName(), fileSplit.getPath(2).getName()); + assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2)); + assertEquals(BLOCKSIZE, fileSplit.getLength(2)); + assertEquals(hosts3[0], fileSplit.getLocations()[0]); + } + // second split is on rack2, contains file2 + if (split.equals(splits.get(1))) { + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file2.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file2.getName(), fileSplit.getPath(1).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); + } + // third split is on rack1, contains file1 + if (split.equals(splits.get(2))) { + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } + } else if (splits.size() == 2) { + // first split is on rack2 or rack3, contains one or two files. + if (split.equals(splits.get(0))) { + assertEquals(1, fileSplit.getLocations().length); + if (fileSplit.getLocations()[0].equals(hosts2[0])) { + assertEquals(2, fileSplit.getNumPaths()); + } else if (fileSplit.getLocations()[0].equals(hosts3[0])) { + assertEquals(3, fileSplit.getNumPaths()); + } else { + fail("First split should be on rack2 or rack3."); + } + } + // second split is on rack1, contains the rest files. + if (split.equals(splits.get(1))) { + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } + } else if (splits.size() == 1) { + // first split is rack1, contains all three files. + assertEquals(1, fileSplit.getLocations().length); + assertEquals(6, fileSplit.getNumPaths()); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } else { + fail("Split size should be 1, 2, or 3."); + } + for (int i = 0; i < fileSplit.getNumPaths(); i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + } + + assertEquals(6, actual.size()); + assertTrue(actual.containsAll(expected)); // create file4 on all three racks Path file4 = new Path(dir4 + "/file4"); @@ -442,37 +598,85 @@ public void testSplitPlacement() throws Exception { for (InputSplit split : splits) { System.out.println("File split(Test3): " + split); } - assertEquals(3, splits.size()); - fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(6, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file3.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(file3.getName(), fileSplit.getPath(1).getName()); - assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); - assertEquals(BLOCKSIZE, fileSplit.getLength(1)); - assertEquals(file3.getName(), fileSplit.getPath(2).getName()); - assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2)); - assertEquals(BLOCKSIZE, fileSplit.getLength(2)); - assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3 - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(2, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file2.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(file2.getName(), fileSplit.getPath(1).getName()); - assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); - assertEquals(BLOCKSIZE, fileSplit.getLength(1)); - assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file1.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 + + expected.add(new Split(file4.getName(), BLOCKSIZE, 0)); + expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE)); + expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE * 2)); + actual.clear(); + + for (InputSplit split : splits) { + fileSplit = (CombineFileSplit) split; + /** + * If rack1 is processed first by + * {@link CombineFileInputFormat#createSplits}, + * create only one split on rack1. + * If rack2 or rack3 is processed first and rack1 is processed second, + * create one split on rack2 or rack3 and the other split is on rack1. + * Otherwise create 3 splits for each rack. + */ + if (splits.size() == 3) { + // first split is on rack3, contains file3 and file4 + if (split.equals(splits.get(0))) { + assertEquals(6, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts3[0], fileSplit.getLocations()[0]); + } + // second split is on rack2, contains file2 + if (split.equals(splits.get(1))) { + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file2.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(file2.getName(), fileSplit.getPath(1).getName()); + assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); + assertEquals(BLOCKSIZE, fileSplit.getLength(1)); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); + } + // third split is on rack1, contains file1 + if (split.equals(splits.get(2))) { + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } + } else if (splits.size() == 2) { + // first split is on rack2 or rack3, contains two or three files. + if (split.equals(splits.get(0))) { + assertEquals(1, fileSplit.getLocations().length); + if (fileSplit.getLocations()[0].equals(hosts2[0])) { + assertEquals(5, fileSplit.getNumPaths()); + } else if (fileSplit.getLocations()[0].equals(hosts3[0])) { + assertEquals(6, fileSplit.getNumPaths()); + } else { + fail("First split should be on rack2 or rack3."); + } + } + // second split is on rack1, contains the rest files. + if (split.equals(splits.get(1))) { + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } + } else if (splits.size() == 1) { + // first split is rack1, contains all four files. + assertEquals(1, fileSplit.getLocations().length); + assertEquals(9, fileSplit.getNumPaths()); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } else { + fail("Split size should be 1, 2, or 3."); + } + for (int i = 0; i < fileSplit.getNumPaths(); i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + } + + assertEquals(9, actual.size()); + assertTrue(actual.containsAll(expected)); // maximum split size is 2 blocks inFormat = new DummyInputFormat(); @@ -485,34 +689,26 @@ public void testSplitPlacement() throws Exception { System.out.println("File split(Test4): " + split); } assertEquals(5, splits.size()); - fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(2, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file3.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(file3.getName(), fileSplit.getPath(1).getName()); - assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); - assertEquals(BLOCKSIZE, fileSplit.getLength(1)); - assertEquals("host3.rack3.com", fileSplit.getLocations()[0]); - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(file2.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(file2.getName(), fileSplit.getPath(1).getName()); - assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); - assertEquals(BLOCKSIZE, fileSplit.getLength(1)); - assertEquals("host2.rack2.com", fileSplit.getLocations()[0]); - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(2, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file1.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(file3.getName(), fileSplit.getPath(1).getName()); - assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(1)); - assertEquals(BLOCKSIZE, fileSplit.getLength(1)); - assertEquals("host1.rack1.com", fileSplit.getLocations()[0]); + + actual.clear(); + reset(mockList); + for (InputSplit split : splits) { + fileSplit = (CombineFileSplit) split; + for (int i = 0; i < fileSplit.getNumPaths(); i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + mockList.add(fileSplit.getLocations()[0]); + } + + assertEquals(9, actual.size()); + assertTrue(actual.containsAll(expected)); + // verify the splits are on all the racks + verify(mockList, atLeastOnce()).add(hosts1[0]); + verify(mockList, atLeastOnce()).add(hosts2[0]); + verify(mockList, atLeastOnce()).add(hosts3[0]); // maximum split size is 3 blocks inFormat = new DummyInputFormat(); @@ -524,44 +720,26 @@ public void testSplitPlacement() throws Exception { for (InputSplit split : splits) { System.out.println("File split(Test5): " + split); } + assertEquals(3, splits.size()); - fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(3, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file3.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(file3.getName(), fileSplit.getPath(1).getName()); - assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); - assertEquals(BLOCKSIZE, fileSplit.getLength(1)); - assertEquals(file3.getName(), fileSplit.getPath(2).getName()); - assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2)); - assertEquals(BLOCKSIZE, fileSplit.getLength(2)); - assertEquals("host3.rack3.com", fileSplit.getLocations()[0]); - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(file2.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(file2.getName(), fileSplit.getPath(1).getName()); - assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); - assertEquals(BLOCKSIZE, fileSplit.getLength(1)); - assertEquals(file4.getName(), fileSplit.getPath(2).getName()); - assertEquals(0, fileSplit.getOffset(2)); - assertEquals(BLOCKSIZE, fileSplit.getLength(2)); - assertEquals("host2.rack2.com", fileSplit.getLocations()[0]); - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(3, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file1.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(file4.getName(), fileSplit.getPath(1).getName()); - assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); - assertEquals(BLOCKSIZE, fileSplit.getLength(1)); - assertEquals(file4.getName(), fileSplit.getPath(2).getName()); - assertEquals(2*BLOCKSIZE, fileSplit.getOffset(2)); - assertEquals(BLOCKSIZE, fileSplit.getLength(2)); - assertEquals("host1.rack1.com", fileSplit.getLocations()[0]); + + actual.clear(); + reset(mockList); + for (InputSplit split : splits) { + fileSplit = (CombineFileSplit) split; + for (int i = 0; i < fileSplit.getNumPaths(); i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + mockList.add(fileSplit.getLocations()[0]); + } + + assertEquals(9, actual.size()); + assertTrue(actual.containsAll(expected)); + verify(mockList, atLeastOnce()).add(hosts1[0]); + verify(mockList, atLeastOnce()).add(hosts2[0]); // maximum split size is 4 blocks inFormat = new DummyInputFormat(); @@ -572,41 +750,23 @@ public void testSplitPlacement() throws Exception { System.out.println("File split(Test6): " + split); } assertEquals(3, splits.size()); - fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(4, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file3.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(file3.getName(), fileSplit.getPath(1).getName()); - assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); - assertEquals(BLOCKSIZE, fileSplit.getLength(1)); - assertEquals(file3.getName(), fileSplit.getPath(2).getName()); - assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(2)); - assertEquals(BLOCKSIZE, fileSplit.getLength(2)); - assertEquals("host3.rack3.com", fileSplit.getLocations()[0]); - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(4, fileSplit.getNumPaths()); - assertEquals(file2.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(file2.getName(), fileSplit.getPath(1).getName()); - assertEquals(BLOCKSIZE, fileSplit.getOffset(1)); - assertEquals(BLOCKSIZE, fileSplit.getLength(1)); - assertEquals(file4.getName(), fileSplit.getPath(2).getName()); - assertEquals(BLOCKSIZE, fileSplit.getOffset(2)); - assertEquals(BLOCKSIZE, fileSplit.getLength(2)); - assertEquals(file4.getName(), fileSplit.getPath(3).getName()); - assertEquals( 2 * BLOCKSIZE, fileSplit.getOffset(3)); - assertEquals(BLOCKSIZE, fileSplit.getLength(3)); - assertEquals("host2.rack2.com", fileSplit.getLocations()[0]); - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file1.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(BLOCKSIZE, fileSplit.getLength(0)); - assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 + + actual.clear(); + reset(mockList); + for (InputSplit split : splits) { + fileSplit = (CombineFileSplit) split; + for (int i = 0; i < fileSplit.getNumPaths(); i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + mockList.add(fileSplit.getLocations()[0]); + } + + assertEquals(9, actual.size()); + assertTrue(actual.containsAll(expected)); + verify(mockList, atLeastOnce()).add(hosts1[0]); // maximum split size is 7 blocks and min is 3 blocks inFormat = new DummyInputFormat(); @@ -619,20 +779,31 @@ public void testSplitPlacement() throws Exception { for (InputSplit split : splits) { System.out.println("File split(Test7): " + split); } + assertEquals(2, splits.size()); - fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(6, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals("host3.rack3.com", fileSplit.getLocations()[0]); - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(3, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals("host1.rack1.com", fileSplit.getLocations()[0]); + + actual.clear(); + reset(mockList); + for (InputSplit split : splits) { + fileSplit = (CombineFileSplit) split; + for (int i = 0; i < fileSplit.getNumPaths(); i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + mockList.add(fileSplit.getLocations()[0]); + } + + assertEquals(9, actual.size()); + assertTrue(actual.containsAll(expected)); + verify(mockList, atLeastOnce()).add(hosts1[0]); // Rack 1 has file1, file2 and file3 and file4 // Rack 2 has file2 and file3 and file4 // Rack 3 has file3 and file4 - // setup a filter so that only file1 and file2 can be combined + // setup a filter so that only (file1 and file2) or (file3 and file4) + // can be combined inFormat = new DummyInputFormat(); FileInputFormat.addInputPath(job, inDir); inFormat.setMinSplitSizeRack(1); // everything is at least rack local @@ -642,19 +813,101 @@ public void testSplitPlacement() throws Exception { for (InputSplit split : splits) { System.out.println("File split(Test1): " + split); } - assertEquals(3, splits.size()); - fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(2, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(6, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3 + + for (InputSplit split : splits) { + fileSplit = (CombineFileSplit) split; + if (splits.size() == 2) { + // first split is on rack1, contains file1 and file2. + if (split.equals(splits.get(0))) { + assertEquals(3, fileSplit.getNumPaths()); + expected.clear(); + expected.add(new Split(file1.getName(), BLOCKSIZE, 0)); + expected.add(new Split(file2.getName(), BLOCKSIZE, 0)); + expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE)); + actual.clear(); + for (int i = 0; i < 3; i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + assertTrue(actual.containsAll(expected)); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } + if (split.equals(splits.get(1))) { + // second split contains the file3 and file4, however, + // the locations is undetermined. + assertEquals(6, fileSplit.getNumPaths()); + expected.clear(); + expected.add(new Split(file3.getName(), BLOCKSIZE, 0)); + expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE)); + expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE * 2)); + expected.add(new Split(file4.getName(), BLOCKSIZE, 0)); + expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE)); + expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE * 2)); + actual.clear(); + for (int i = 0; i < 6; i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + assertTrue(actual.containsAll(expected)); + assertEquals(1, fileSplit.getLocations().length); + } + } else if (splits.size() == 3) { + if (split.equals(splits.get(0))) { + // first split is on rack2, contains file2 + assertEquals(2, fileSplit.getNumPaths()); + expected.clear(); + expected.add(new Split(file2.getName(), BLOCKSIZE, 0)); + expected.add(new Split(file2.getName(), BLOCKSIZE, BLOCKSIZE)); + actual.clear(); + for (int i = 0; i < 2; i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + assertTrue(actual.containsAll(expected)); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); + } + if (split.equals(splits.get(1))) { + // second split is on rack1, contains file1 + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(BLOCKSIZE, fileSplit.getLength(0)); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } + if (split.equals(splits.get(2))) { + // third split contains file3 and file4, however, + // the locations is undetermined. + assertEquals(6, fileSplit.getNumPaths()); + expected.clear(); + expected.add(new Split(file3.getName(), BLOCKSIZE, 0)); + expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE)); + expected.add(new Split(file3.getName(), BLOCKSIZE, BLOCKSIZE * 2)); + expected.add(new Split(file4.getName(), BLOCKSIZE, 0)); + expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE)); + expected.add(new Split(file4.getName(), BLOCKSIZE, BLOCKSIZE * 2)); + actual.clear(); + for (int i = 0; i < 6; i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + assertTrue(actual.containsAll(expected)); + assertEquals(1, fileSplit.getLocations().length); + } + } else { + fail("Split size should be 2 or 3."); + } + } // measure performance when there are multiple pools and // many files in each pool. @@ -844,7 +1097,14 @@ public void testNodeInputSplit() throws IOException, InterruptedException { assertEquals(3, nodeSplits.count(locations[1])); } + /** + * The test suppresses unchecked warnings in + * {@link org.mockito.Mockito#reset}. Although calling the method is + * a bad manner, we call the method instead of splitting the test + * (i.e. restarting MiniDFSCluster) to save time. + */ @Test + @SuppressWarnings("unchecked") public void testSplitPlacementForCompressedFiles() throws Exception { MiniDFSCluster dfs = null; FileSystem fileSys = null; @@ -915,21 +1175,55 @@ public void testSplitPlacementForCompressedFiles() throws Exception { for (InputSplit split : splits) { System.out.println("File split(Test1): " + split); } - assertEquals(2, splits.size()); - fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file2.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(f2.getLen(), fileSplit.getLength(0)); - assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file1.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(f1.getLen(), fileSplit.getLength(0)); - assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 + + Set expected = new HashSet<>(); + expected.add(new Split(file1.getName(), f1.getLen(), 0)); + expected.add(new Split(file2.getName(), f2.getLen(), 0)); + List actual = new ArrayList<>(); + + /** + * If rack1 is processed first by + * {@link CombineFileInputFormat#createSplits}, + * create only one split on rack1. Otherwise create two splits. + */ + for (InputSplit split : splits) { + fileSplit = (CombineFileSplit) split; + if (splits.size() == 2) { + if (split.equals(splits.get(0))) { + // first split is on rack2, contains file2. + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file2.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f2.getLen(), fileSplit.getLength(0)); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); + } + if (split.equals(splits.get(1))) { + // second split is on rack1, contains file1. + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(f1.getLen(), fileSplit.getLength(0)); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } + } else if (splits.size() == 1) { + // first split is on rack1, contains file1 and file2. + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } else { + fail("Split size should be 1 or 2."); + } + for (int i = 0; i < fileSplit.getNumPaths(); i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + } + assertEquals(2, actual.size()); + assertTrue(actual.containsAll(expected)); // create another file on 3 datanodes and 3 racks. dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null); @@ -943,28 +1237,83 @@ public void testSplitPlacementForCompressedFiles() throws Exception { for (InputSplit split : splits) { System.out.println("File split(Test2): " + split); } - assertEquals(3, splits.size()); - fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file3.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(f3.getLen(), fileSplit.getLength(0)); - assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3 - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file2.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(f2.getLen(), fileSplit.getLength(0)); - assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file1.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(f1.getLen(), fileSplit.getLength(0)); - assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 + + expected.add(new Split(file3.getName(), f3.getLen(), 0)); + actual.clear(); + + for (InputSplit split : splits) { + fileSplit = (CombineFileSplit) split; + /** + * If rack1 is processed first by + * {@link CombineFileInputFormat#createSplits}, + * create only one split on rack1. + * If rack2 or rack3 is processed first and rack1 is processed second, + * create one split on rack2 or rack3 and the other split is on rack1. + * Otherwise create 3 splits for each rack. + */ + if (splits.size() == 3) { + // first split is on rack3, contains file3 + if (split.equals(splits.get(0))) { + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(file3.getName(), fileSplit.getPath(0).getName()); + assertEquals(f3.getLen(), fileSplit.getLength(0)); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts3[0], fileSplit.getLocations()[0]); + } + // second split is on rack2, contains file2 + if (split.equals(splits.get(1))) { + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(file2.getName(), fileSplit.getPath(0).getName()); + assertEquals(f2.getLen(), fileSplit.getLength(0)); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); + } + // third split is on rack1, contains file1 + if (split.equals(splits.get(2))) { + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(f1.getLen(), fileSplit.getLength(0)); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } + } else if (splits.size() == 2) { + // first split is on rack2 or rack3, contains one or two files. + if (split.equals(splits.get(0))) { + assertEquals(1, fileSplit.getLocations().length); + if (fileSplit.getLocations()[0].equals(hosts2[0])) { + assertEquals(2, fileSplit.getNumPaths()); + } else if (fileSplit.getLocations()[0].equals(hosts3[0])) { + assertEquals(1, fileSplit.getNumPaths()); + } else { + fail("First split should be on rack2 or rack3."); + } + } + // second split is on rack1, contains the rest files. + if (split.equals(splits.get(1))) { + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } + } else if (splits.size() == 1) { + // first split is rack1, contains all three files. + assertEquals(1, fileSplit.getLocations().length); + assertEquals(3, fileSplit.getNumPaths()); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } else { + fail("Split size should be 1, 2, or 3."); + } + for (int i = 0; i < fileSplit.getNumPaths(); i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + } + + assertEquals(3, actual.size()); + assertTrue(actual.containsAll(expected)); // create file4 on all three racks Path file4 = new Path(dir4 + "/file4.gz"); @@ -977,31 +1326,79 @@ public void testSplitPlacementForCompressedFiles() throws Exception { for (InputSplit split : splits) { System.out.println("File split(Test3): " + split); } - assertEquals(3, splits.size()); - fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(2, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file3.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(f3.getLen(), fileSplit.getLength(0)); - assertEquals(file4.getName(), fileSplit.getPath(1).getName()); - assertEquals(0, fileSplit.getOffset(1)); - assertEquals(f4.getLen(), fileSplit.getLength(1)); - assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3 - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file2.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(f2.getLen(), fileSplit.getLength(0)); - assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file1.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(f1.getLen(), fileSplit.getLength(0)); - assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 + + expected.add(new Split(file3.getName(), f3.getLen(), 0)); + actual.clear(); + + for (InputSplit split : splits) { + fileSplit = (CombineFileSplit) split; + /** + * If rack1 is processed first by + * {@link CombineFileInputFormat#createSplits}, + * create only one split on rack1. + * If rack2 or rack3 is processed first and rack1 is processed second, + * create one split on rack2 or rack3 and the other split is on rack1. + * Otherwise create 3 splits for each rack. + */ + if (splits.size() == 3) { + // first split is on rack3, contains file3 and file4 + if (split.equals(splits.get(0))) { + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(hosts3[0], fileSplit.getLocations()[0]); + } + // second split is on rack2, contains file2 + if (split.equals(splits.get(1))) { + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(file2.getName(), fileSplit.getPath(0).getName()); + assertEquals(f2.getLen(), fileSplit.getLength(0)); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); + } + // third split is on rack1, contains file1 + if (split.equals(splits.get(2))) { + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(file1.getName(), fileSplit.getPath(0).getName()); + assertEquals(f1.getLen(), fileSplit.getLength(0)); + assertEquals(0, fileSplit.getOffset(0)); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } + } else if (splits.size() == 2) { + // first split is on rack2 or rack3, contains two or three files. + if (split.equals(splits.get(0))) { + assertEquals(1, fileSplit.getLocations().length); + if (fileSplit.getLocations()[0].equals(hosts2[0])) { + assertEquals(3, fileSplit.getNumPaths()); + } else if (fileSplit.getLocations()[0].equals(hosts3[0])) { + assertEquals(2, fileSplit.getNumPaths()); + } else { + fail("First split should be on rack2 or rack3."); + } + } + // second split is on rack1, contains the rest files. + if (split.equals(splits.get(1))) { + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } + } else if (splits.size() == 1) { + // first split is rack1, contains all four files. + assertEquals(1, fileSplit.getLocations().length); + assertEquals(4, fileSplit.getNumPaths()); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } else { + fail("Split size should be 1, 2, or 3."); + } + for (int i = 0; i < fileSplit.getNumPaths(); i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + } + + assertEquals(4, actual.size()); + assertTrue(actual.containsAll(expected)); // maximum split size is file1's length inFormat = new DummyInputFormat(); @@ -1014,32 +1411,24 @@ public void testSplitPlacementForCompressedFiles() throws Exception { System.out.println("File split(Test4): " + split); } assertEquals(4, splits.size()); - fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file3.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(f3.getLen(), fileSplit.getLength(0)); - assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3 - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(file2.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(f2.getLen(), fileSplit.getLength(0)); - assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r3 - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file1.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(f1.getLen(), fileSplit.getLength(0)); - assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r2 - fileSplit = (CombineFileSplit) splits.get(3); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file4.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(f4.getLen(), fileSplit.getLength(0)); - assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r1 + + actual.clear(); + for (InputSplit split : splits) { + fileSplit = (CombineFileSplit) split; + for (int i = 0; i < fileSplit.getNumPaths(); i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + mockList.add(fileSplit.getLocations()[0]); + } + + assertEquals(4, actual.size()); + assertTrue(actual.containsAll(expected)); + verify(mockList, atLeastOnce()).add(hosts1[0]); + verify(mockList, atLeastOnce()).add(hosts2[0]); + verify(mockList, atLeastOnce()).add(hosts3[0]); // maximum split size is twice file1's length inFormat = new DummyInputFormat(); @@ -1051,31 +1440,33 @@ public void testSplitPlacementForCompressedFiles() throws Exception { for (InputSplit split : splits) { System.out.println("File split(Test5): " + split); } - assertEquals(3, splits.size()); - fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(2, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file3.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(f3.getLen(), fileSplit.getLength(0)); - assertEquals(file4.getName(), fileSplit.getPath(1).getName()); - assertEquals(0, fileSplit.getOffset(1)); - assertEquals(f4.getLen(), fileSplit.getLength(1)); - assertEquals(hosts3[0], fileSplit.getLocations()[0]); - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file2.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(f2.getLen(), fileSplit.getLength(0)); - assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file1.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(f1.getLen(), fileSplit.getLength(0)); - assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 + + actual.clear(); + reset(mockList); + for (InputSplit split : splits) { + fileSplit = (CombineFileSplit) split; + for (int i = 0; i < fileSplit.getNumPaths(); i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + mockList.add(fileSplit.getLocations()[0]); + } + assertEquals(4, actual.size()); + assertTrue(actual.containsAll(expected)); + + if (splits.size() == 3) { + // splits are on all the racks + verify(mockList, times(1)).add(hosts1[0]); + verify(mockList, times(1)).add(hosts2[0]); + verify(mockList, times(1)).add(hosts3[0]); + } else if (splits.size() == 2) { + // one split is on rack1, another split is on rack2 or rack3 + verify(mockList, times(1)).add(hosts1[0]); + } else { + fail("Split size should be 2 or 3."); + } // maximum split size is 4 times file1's length inFormat = new DummyInputFormat(); @@ -1087,26 +1478,29 @@ public void testSplitPlacementForCompressedFiles() throws Exception { for (InputSplit split : splits) { System.out.println("File split(Test6): " + split); } - assertEquals(2, splits.size()); - fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(2, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(file3.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(f3.getLen(), fileSplit.getLength(0)); - assertEquals(file4.getName(), fileSplit.getPath(1).getName()); - assertEquals(0, fileSplit.getOffset(1)); - assertEquals(f4.getLen(), fileSplit.getLength(1)); - assertEquals(hosts3[0], fileSplit.getLocations()[0]); - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(2, fileSplit.getNumPaths()); - assertEquals(file1.getName(), fileSplit.getPath(0).getName()); - assertEquals(0, fileSplit.getOffset(0)); - assertEquals(f1.getLen(), fileSplit.getLength(0)); - assertEquals(file2.getName(), fileSplit.getPath(1).getName()); - assertEquals(0, fileSplit.getOffset(1), BLOCKSIZE); - assertEquals(f2.getLen(), fileSplit.getLength(1)); - assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 + + /** + * If rack1 is processed first by + * {@link CombineFileInputFormat#createSplits}, + * create only one split on rack1. Otherwise create two splits. + */ + assertTrue("Split size should be 1 or 2.", + splits.size() == 1 || splits.size() == 2); + actual.clear(); + reset(mockList); + for (InputSplit split : splits) { + fileSplit = (CombineFileSplit) split; + for (int i = 0; i < fileSplit.getNumPaths(); i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + mockList.add(fileSplit.getLocations()[0]); + } + assertEquals(4, actual.size()); + assertTrue(actual.containsAll(expected)); + verify(mockList, times(1)).add(hosts1[0]); // maximum split size and min-split-size per rack is 4 times file1's length inFormat = new DummyInputFormat(); @@ -1146,25 +1540,57 @@ public void testSplitPlacementForCompressedFiles() throws Exception { inFormat = new DummyInputFormat(); FileInputFormat.addInputPath(job, inDir); inFormat.setMinSplitSizeRack(1); // everything is at least rack local - inFormat.createPool(new TestFilter(dir1), - new TestFilter(dir2)); + inFormat.createPool(new TestFilter(dir1), + new TestFilter(dir2)); splits = inFormat.getSplits(job); for (InputSplit split : splits) { System.out.println("File split(Test9): " + split); } - assertEquals(3, splits.size()); - fileSplit = (CombineFileSplit) splits.get(0); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2 - fileSplit = (CombineFileSplit) splits.get(1); - assertEquals(1, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1 - fileSplit = (CombineFileSplit) splits.get(2); - assertEquals(2, fileSplit.getNumPaths()); - assertEquals(1, fileSplit.getLocations().length); - assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3 + + actual.clear(); + for (InputSplit split : splits) { + fileSplit = (CombineFileSplit) split; + if (splits.size() == 3) { + // If rack2 is processed first + if (split.equals(splits.get(0))) { + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts2[0], fileSplit.getLocations()[0]); + } + if (split.equals(splits.get(1))) { + assertEquals(1, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } + if (split.equals(splits.get(2))) { + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts3[0], fileSplit.getLocations()[0]); + } + } else if (splits.size() == 2) { + // If rack1 is processed first + if (split.equals(splits.get(0))) { + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts1[0], fileSplit.getLocations()[0]); + } + if (split.equals(splits.get(1))) { + assertEquals(2, fileSplit.getNumPaths()); + assertEquals(1, fileSplit.getLocations().length); + assertEquals(hosts3[0], fileSplit.getLocations()[0]); + } + } else { + fail("Split size should be 2 or 3."); + } + for (int i = 0; i < fileSplit.getNumPaths(); i++) { + String name = fileSplit.getPath(i).getName(); + long length = fileSplit.getLength(i); + long offset = fileSplit.getOffset(i); + actual.add(new Split(name, length, offset)); + } + } + assertEquals(4, actual.size()); + assertTrue(actual.containsAll(expected)); // measure performance when there are multiple pools and // many files in each pool.