MAPREDUCE-6793. io.sort.factor code default and mapred-default.xml values inconsistent. Contributed by Gera Shegalov.

This commit is contained in:
Rohith Sharma K S 2016-11-21 22:10:57 +05:30
parent 009452bb6d
commit 683e0c71fe
4 changed files with 17 additions and 3 deletions

View File

@ -967,7 +967,8 @@ public void init(MapOutputCollector.Context context
//sanity checks //sanity checks
final float spillper = final float spillper =
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB,
MRJobConfig.DEFAULT_IO_SORT_MB);
indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
INDEX_CACHE_MEMORY_LIMIT_DEFAULT); INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
if (spillper > (float)1.0 || spillper <= (float)0.0) { if (spillper > (float)1.0 || spillper <= (float)0.0) {
@ -1920,7 +1921,8 @@ private void mergeParts() throws IOException, InterruptedException,
} }
} }
int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100); int mergeFactor = job.getInt(MRJobConfig.IO_SORT_FACTOR,
MRJobConfig.DEFAULT_IO_SORT_FACTOR);
// sort the segments only if there are intermediate merges // sort the segments only if there are intermediate merges
boolean sortSegments = segmentList.size() > mergeFactor; boolean sortSegments = segmentList.size() > mergeFactor;
//merge //merge

View File

@ -215,8 +215,12 @@ public interface MRJobConfig {
public static final String IO_SORT_FACTOR = "mapreduce.task.io.sort.factor"; public static final String IO_SORT_FACTOR = "mapreduce.task.io.sort.factor";
public static final int DEFAULT_IO_SORT_FACTOR = 10;
public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb"; public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb";
public static final int DEFAULT_IO_SORT_MB = 100;
public static final String INDEX_CACHE_MEMORY_LIMIT = "mapreduce.task.index.cache.limit.bytes"; public static final String INDEX_CACHE_MEMORY_LIMIT = "mapreduce.task.index.cache.limit.bytes";
public static final String PRESERVE_FAILED_TASK_FILES = "mapreduce.task.files.preserve.failedtasks"; public static final String PRESERVE_FAILED_TASK_FILES = "mapreduce.task.files.preserve.failedtasks";

View File

@ -175,7 +175,8 @@ public MergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf,
MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES,
Runtime.getRuntime().maxMemory()) * maxInMemCopyUse); Runtime.getRuntime().maxMemory()) * maxInMemCopyUse);
this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100); this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR,
MRJobConfig.DEFAULT_IO_SORT_FACTOR);
final float singleShuffleMemoryLimitPercent = final float singleShuffleMemoryLimitPercent =
jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, jobConf.getFloat(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT,

View File

@ -207,6 +207,13 @@ public int getNumExceptions() {
} }
} }
@Test
public void testIoSortDefaults() {
final JobConf jobConf = new JobConf();
assertEquals(10, jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100));
assertEquals(100, jobConf.getInt(MRJobConfig.IO_SORT_MB, 10));
}
@SuppressWarnings({ "unchecked", "deprecation" }) @SuppressWarnings({ "unchecked", "deprecation" })
@Test(timeout=10000) @Test(timeout=10000)
public void testOnDiskMerger() throws IOException, URISyntaxException, public void testOnDiskMerger() throws IOException, URISyntaxException,