HADOOP-9230. TestUniformSizeInputFormat fails intermittently. (kkambatl via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1451291 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5363d831a0
commit
6401d5d2ec
@ -397,6 +397,9 @@ Release 2.0.4-beta - UNRELEASED
|
|||||||
|
|
||||||
HADOOP-9342. Remove jline from distribution. (thw via tucu)
|
HADOOP-9342. Remove jline from distribution. (thw via tucu)
|
||||||
|
|
||||||
|
HADOOP-9230. TestUniformSizeInputFormat fails intermittently.
|
||||||
|
(kkambatl via tucu)
|
||||||
|
|
||||||
Release 2.0.3-alpha - 2013-02-06
|
Release 2.0.3-alpha - 2013-02-06
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -33,8 +33,6 @@
|
|||||||
import org.apache.hadoop.tools.DistCpOptions;
|
import org.apache.hadoop.tools.DistCpOptions;
|
||||||
import org.apache.hadoop.tools.StubContext;
|
import org.apache.hadoop.tools.StubContext;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
@ -48,9 +46,6 @@
|
|||||||
|
|
||||||
|
|
||||||
public class TestUniformSizeInputFormat {
|
public class TestUniformSizeInputFormat {
|
||||||
private static final Log LOG
|
|
||||||
= LogFactory.getLog(TestUniformSizeInputFormat.class);
|
|
||||||
|
|
||||||
private static MiniDFSCluster cluster;
|
private static MiniDFSCluster cluster;
|
||||||
private static final int N_FILES = 20;
|
private static final int N_FILES = 20;
|
||||||
private static final int SIZEOF_EACH_FILE=1024;
|
private static final int SIZEOF_EACH_FILE=1024;
|
||||||
@ -118,12 +113,9 @@ public void testGetSplits(int nMaps) throws Exception {
|
|||||||
List<InputSplit> splits
|
List<InputSplit> splits
|
||||||
= uniformSizeInputFormat.getSplits(jobContext);
|
= uniformSizeInputFormat.getSplits(jobContext);
|
||||||
|
|
||||||
List<InputSplit> legacySplits = legacyGetSplits(listFile, nMaps);
|
|
||||||
|
|
||||||
int sizePerMap = totalFileSize/nMaps;
|
int sizePerMap = totalFileSize/nMaps;
|
||||||
|
|
||||||
checkSplits(listFile, splits);
|
checkSplits(listFile, splits);
|
||||||
checkAgainstLegacy(splits, legacySplits);
|
|
||||||
|
|
||||||
int doubleCheckedTotalSize = 0;
|
int doubleCheckedTotalSize = 0;
|
||||||
int previousSplitSize = -1;
|
int previousSplitSize = -1;
|
||||||
@ -155,57 +147,6 @@ public void testGetSplits(int nMaps) throws Exception {
|
|||||||
Assert.assertEquals(totalFileSize, doubleCheckedTotalSize);
|
Assert.assertEquals(totalFileSize, doubleCheckedTotalSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
// From
|
|
||||||
// http://svn.apache.org/repos/asf/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/DistCp.java
|
|
||||||
private List<InputSplit> legacyGetSplits(Path listFile, int numSplits)
|
|
||||||
throws IOException {
|
|
||||||
|
|
||||||
FileSystem fs = cluster.getFileSystem();
|
|
||||||
FileStatus srcst = fs.getFileStatus(listFile);
|
|
||||||
Configuration conf = fs.getConf();
|
|
||||||
|
|
||||||
ArrayList<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
|
|
||||||
FileStatus value = new FileStatus();
|
|
||||||
Text key = new Text();
|
|
||||||
final long targetsize = totalFileSize / numSplits;
|
|
||||||
long pos = 0L;
|
|
||||||
long last = 0L;
|
|
||||||
long acc = 0L;
|
|
||||||
long cbrem = srcst.getLen();
|
|
||||||
SequenceFile.Reader sl = null;
|
|
||||||
|
|
||||||
LOG.info("Average bytes per map: " + targetsize +
|
|
||||||
", Number of maps: " + numSplits + ", total size: " + totalFileSize);
|
|
||||||
|
|
||||||
try {
|
|
||||||
sl = new SequenceFile.Reader(conf, SequenceFile.Reader.file(listFile));
|
|
||||||
for (; sl.next(key, value); last = sl.getPosition()) {
|
|
||||||
// if adding this split would put this split past the target size,
|
|
||||||
// cut the last split and put this next file in the next split.
|
|
||||||
if (acc + value.getLen() > targetsize && acc != 0) {
|
|
||||||
long splitsize = last - pos;
|
|
||||||
FileSplit fileSplit = new FileSplit(listFile, pos, splitsize, null);
|
|
||||||
LOG.info ("Creating split : " + fileSplit + ", bytes in split: " + splitsize);
|
|
||||||
splits.add(fileSplit);
|
|
||||||
cbrem -= splitsize;
|
|
||||||
pos = last;
|
|
||||||
acc = 0L;
|
|
||||||
}
|
|
||||||
acc += value.getLen();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
IOUtils.closeStream(sl);
|
|
||||||
}
|
|
||||||
if (cbrem != 0) {
|
|
||||||
FileSplit fileSplit = new FileSplit(listFile, pos, cbrem, null);
|
|
||||||
LOG.info ("Creating split : " + fileSplit + ", bytes in split: " + cbrem);
|
|
||||||
splits.add(fileSplit);
|
|
||||||
}
|
|
||||||
|
|
||||||
return splits;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException {
|
private void checkSplits(Path listFile, List<InputSplit> splits) throws IOException {
|
||||||
long lastEnd = 0;
|
long lastEnd = 0;
|
||||||
|
|
||||||
@ -233,18 +174,6 @@ private void checkSplits(Path listFile, List<InputSplit> splits) throws IOExcept
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkAgainstLegacy(List<InputSplit> splits,
|
|
||||||
List<InputSplit> legacySplits)
|
|
||||||
throws IOException, InterruptedException {
|
|
||||||
|
|
||||||
Assert.assertEquals(legacySplits.size(), splits.size());
|
|
||||||
for (int index = 0; index < splits.size(); index++) {
|
|
||||||
FileSplit fileSplit = (FileSplit) splits.get(index);
|
|
||||||
FileSplit legacyFileSplit = (FileSplit) legacySplits.get(index);
|
|
||||||
Assert.assertEquals(fileSplit.getStart(), legacyFileSplit.getStart());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetSplits() throws Exception {
|
public void testGetSplits() throws Exception {
|
||||||
testGetSplits(9);
|
testGetSplits(9);
|
||||||
|
Loading…
Reference in New Issue
Block a user