MAPREDUCE-6067. native-task: fix some counter issues (Binglin Chang)

This commit is contained in:
Binglin Chang 2014-09-05 14:20:39 +08:00
parent 1081d9cee2
commit 00322161b5
19 changed files with 178 additions and 182 deletions

View File

@ -20,3 +20,4 @@ MAPREDUCE-6054. native-task: Speed up tests (todd)
MAPREDUCE-6058. native-task: KVTest and LargeKVTest should check mr job is sucessful (Binglin Chang)
MAPREDUCE-6056. native-task: move system test working dir to target dir and cleanup test config xml files (Manu Zhang via bchang)
MAPREDUCE-6055. native-task: findbugs, interface annotations, and other misc cleanup (todd)
MAPREDUCE-6067. native-task: fix some counter issues (Binglin Chang)

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.mapred.nativetask.serde.NativeSerialization;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.util.QuickSort;
/**
@ -46,6 +47,7 @@ public class NativeMapOutputCollectorDelegator<K, V> implements MapOutputCollect
private JobConf job;
private NativeCollectorOnlyHandler<K, V> handler;
private Context context;
private StatusReportChecker updater;
@Override
@ -58,6 +60,7 @@ public void close() throws IOException, InterruptedException {
handler.close();
if (null != updater) {
updater.stop();
NativeRuntime.reportStatus(context.getReporter());
}
}
@ -69,6 +72,7 @@ public void flush() throws IOException, InterruptedException, ClassNotFoundExcep
@SuppressWarnings("unchecked")
@Override
public void init(Context context) throws IOException, ClassNotFoundException {
this.context = context;
this.job = context.getJobConf();
Platforms.init(job);

View File

@ -76,12 +76,7 @@ protected void initUsedCounters() {
reporter.getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
reporter.getCounter(TaskCounter.REDUCE_INPUT_RECORDS);
reporter.getCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS);
reporter.getCounter(TaskCounter.SPILLED_RECORDS);
reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
}
public synchronized void start() {

View File

@ -18,6 +18,7 @@
#include "commons.h"
#include "util/StringUtil.h"
#include "lib/TaskCounters.h"
#include "MCollectorOutputHandler.h"
#include "NativeObjectFactory.h"
#include "MapOutputCollector.h"
@ -94,4 +95,4 @@ KVBuffer * MCollectorOutputHandler::allocateKVBuffer(uint32_t partitionId, uint3
return dest;
}
} //namespace
} // namespace NativeTask

View File

@ -98,7 +98,7 @@ IFileWriter * IFileWriter::create(const std::string & filepath, const MapOutputS
IFileWriter::IFileWriter(OutputStream * stream, ChecksumType checksumType, KeyValueType ktype,
KeyValueType vtype, const string & codec, Counter * counter, bool deleteTargetStream)
: _stream(stream), _dest(NULL), _checksumType(checksumType), _kType(ktype), _vType(vtype),
_codec(codec), _recordCounter(counter), _deleteTargetStream(deleteTargetStream) {
_codec(codec), _recordCounter(counter), _recordCount(0), _deleteTargetStream(deleteTargetStream) {
_dest = new ChecksumOutputStream(_stream, _checksumType);
_appendBuffer.init(128 * 1024, _dest, _codec);
}
@ -184,6 +184,7 @@ void IFileWriter::write(const char * key, uint32_t keyLen, const char * value, u
if (NULL != _recordCounter) {
_recordCounter->increase();
}
_recordCount++;
switch (_vType) {
case TextType:
@ -214,7 +215,7 @@ SingleSpillInfo * IFileWriter::getSpillInfo() {
_codec);
}
void IFileWriter::getStatistics(uint64_t & offset, uint64_t & realOffset) {
void IFileWriter::getStatistics(uint64_t & offset, uint64_t & realOffset, uint64_t & recordCount) {
if (_spillFileSegments.size() > 0) {
offset = _spillFileSegments[_spillFileSegments.size() - 1].uncompressedEndOffset;
realOffset = _spillFileSegments[_spillFileSegments.size() - 1].realEndOffset;
@ -222,6 +223,7 @@ void IFileWriter::getStatistics(uint64_t & offset, uint64_t & realOffset) {
offset = 0;
realOffset = 0;
}
recordCount = _recordCount;
}
} // namespace NativeTask

View File

@ -129,6 +129,7 @@ protected:
AppendBuffer _appendBuffer;
vector<IFileSegment> _spillFileSegments;
Counter * _recordCounter;
uint64_t _recordCount;
bool _deleteTargetStream;
@ -153,7 +154,7 @@ public:
SingleSpillInfo * getSpillInfo();
void getStatistics(uint64_t & offset, uint64_t & realOffset);
void getStatistics(uint64_t & offset, uint64_t & realOffset, uint64_t & recordCount);
virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen) {
write((const char*)key, keyLen, (const char*)value, valueLen);

View File

@ -76,9 +76,11 @@ void CombineRunnerWrapper::combine(CombineContext type, KVIterator * kvIterator,
/////////////////////////////////////////////////////////////////
MapOutputCollector::MapOutputCollector(uint32_t numberPartitions, SpillOutputService * spillService)
: _config(NULL), _numPartitions(numberPartitions), _buckets(NULL), _keyComparator(NULL),
_combineRunner(NULL), _spilledRecords(NULL), _spillOutput(spillService),
_defaultBlockSize(0), _pool(NULL) {
: _config(NULL), _numPartitions(numberPartitions), _buckets(NULL),
_keyComparator(NULL), _combineRunner(NULL),
_mapOutputRecords(NULL), _mapOutputBytes(NULL),
_mapOutputMaterializedBytes(NULL), _spilledRecords(NULL),
_spillOutput(spillService), _defaultBlockSize(0), _pool(NULL) {
_pool = new MemoryPool();
}
@ -108,7 +110,7 @@ MapOutputCollector::~MapOutputCollector() {
}
void MapOutputCollector::init(uint32_t defaultBlockSize, uint32_t memoryCapacity,
ComparatorPtr keyComparator, Counter * spilledRecords, ICombineRunner * combiner) {
ComparatorPtr keyComparator, ICombineRunner * combiner) {
this->_combineRunner = combiner;
@ -128,7 +130,15 @@ void MapOutputCollector::init(uint32_t defaultBlockSize, uint32_t memoryCapacity
_buckets[partitionId] = pb;
}
_spilledRecords = spilledRecords;
_mapOutputRecords = NativeObjectFactory::GetCounter(
TaskCounters::TASK_COUNTER_GROUP, TaskCounters::MAP_OUTPUT_RECORDS);
_mapOutputBytes = NativeObjectFactory::GetCounter(
TaskCounters::TASK_COUNTER_GROUP, TaskCounters::MAP_OUTPUT_BYTES);
_mapOutputMaterializedBytes = NativeObjectFactory::GetCounter(
TaskCounters::TASK_COUNTER_GROUP,
TaskCounters::MAP_OUTPUT_MATERIALIZED_BYTES);
_spilledRecords = NativeObjectFactory::GetCounter(
TaskCounters::TASK_COUNTER_GROUP, TaskCounters::SPILLED_RECORDS);
_collectTimer.reset();
}
@ -155,9 +165,6 @@ void MapOutputCollector::configure(Config * config) {
ComparatorPtr comparator = getComparator(config, _spec);
Counter * spilledRecord = NativeObjectFactory::GetCounter(TaskCounters::TASK_COUNTER_GROUP,
TaskCounters::SPILLED_RECORDS);
ICombineRunner * combiner = NULL;
if (NULL != config->get(NATIVE_COMBINER)
// config name for old api and new api
@ -166,7 +173,7 @@ void MapOutputCollector::configure(Config * config) {
combiner = new CombineRunnerWrapper(config, _spillOutput);
}
init(defaultBlockSize, capacity, comparator, spilledRecord, combiner);
init(defaultBlockSize, capacity, comparator, combiner);
}
KVBuffer * MapOutputCollector::allocateKVBuffer(uint32_t partitionId, uint32_t kvlength) {
@ -182,7 +189,7 @@ KVBuffer * MapOutputCollector::allocateKVBuffer(uint32_t partitionId, uint32_t k
if (NULL == spillpath || spillpath->length() == 0) {
THROW_EXCEPTION(IOException, "Illegal(empty) spill files path");
} else {
middleSpill(*spillpath, "");
middleSpill(*spillpath, "", false);
delete spillpath;
}
@ -193,6 +200,8 @@ KVBuffer * MapOutputCollector::allocateKVBuffer(uint32_t partitionId, uint32_t k
THROW_EXCEPTION(OutOfMemoryException, "key/value pair larger than io.sort.mb");
}
}
_mapOutputRecords->increase();
_mapOutputBytes->increase(kvlength - KVBuffer::headerLength());
return dest;
}
@ -272,10 +281,9 @@ void MapOutputCollector::sortPartitions(SortOrder orderType, SortAlgorithm sortT
}
void MapOutputCollector::middleSpill(const std::string & spillOutput,
const std::string & indexFilePath) {
const std::string & indexFilePath, bool final) {
uint64_t collecttime = _collectTimer.now() - _collectTimer.last();
const uint64_t M = 1000000; //million
if (spillOutput.empty()) {
THROW_EXCEPTION(IOException, "MapOutputCollector: Spill file path empty");
@ -293,10 +301,24 @@ void MapOutputCollector::middleSpill(const std::string & spillOutput,
info->path = spillOutput;
uint64_t spillTime = timer.now() - timer.last() - metrics.sortTime;
LOG(
"[MapOutputCollector::mid_spill] Sort and spill: {spilled file path: %s, id: %d, collect: %"PRIu64" ms, sort: %"PRIu64" ms, spill: %"PRIu64" ms, records: %"PRIu64", uncompressed total bytes: %"PRIu64", compressed total bytes: %"PRIu64"}",
info->path.c_str(), _spillInfos.getSpillCount(), collecttime / M, metrics.sortTime / M, spillTime / M,
metrics.recordCount, info->getEndPosition(), info->getRealEndPosition());
const uint64_t M = 1000000; //million
LOG("%s-spill: { id: %d, collect: %"PRIu64" ms, "
"in-memory sort: %"PRIu64" ms, in-memory records: %"PRIu64", "
"merge&spill: %"PRIu64" ms, uncompressed size: %"PRIu64", "
"real size: %"PRIu64" path: %s }",
final ? "Final" : "Mid",
_spillInfos.getSpillCount(),
collecttime / M,
metrics.sortTime / M,
metrics.recordCount,
spillTime / M,
info->getEndPosition(),
info->getRealEndPosition(),
spillOutput.c_str());
if (final) {
_mapOutputMaterializedBytes->increase(info->getRealEndPosition());
}
if (indexFilePath.length() > 0) {
info->writeSpillInfo(indexFilePath);
@ -320,11 +342,8 @@ void MapOutputCollector::middleSpill(const std::string & spillOutput,
void MapOutputCollector::finalSpill(const std::string & filepath,
const std::string & idx_file_path) {
const uint64_t M = 1000000; //million
LOG("[MapOutputCollector::final_merge_and_spill] Spilling file path: %s", filepath.c_str());
if (_spillInfos.getSpillCount() == 0) {
middleSpill(filepath, idx_file_path);
middleSpill(filepath, idx_file_path, true);
return;
}
@ -339,16 +358,32 @@ void MapOutputCollector::finalSpill(const std::string & filepath,
SortMetrics metrics;
sortPartitions(_spec.sortOrder, _spec.sortAlgorithm, NULL, metrics);
LOG("[MapOutputCollector::mid_spill] Sort final in memory kvs: {sort: %"PRIu64" ms, records: %"PRIu64"}",
metrics.sortTime / M, metrics.recordCount);
merger->addMergeEntry(new MemoryMergeEntry(_buckets, _numPartitions));
Timer timer;
merger->merge();
LOG(
"[MapOutputCollector::final_merge_and_spill] Merge and Spill:{spilled file id: %d, merge and spill time: %"PRIu64" ms}",
_spillInfos.getSpillCount(), (timer.now() - timer.last()) / M);
uint64_t outputSize;
uint64_t realOutputSize;
uint64_t recordCount;
writer->getStatistics(outputSize, realOutputSize, recordCount);
const uint64_t M = 1000000; //million
LOG("Final-merge-spill: { id: %d, in-memory sort: %"PRIu64" ms, "
"in-memory records: %"PRIu64", merge&spill: %"PRIu64" ms, "
"records: %"PRIu64", uncompressed size: %"PRIu64", "
"real size: %"PRIu64" path: %s }",
_spillInfos.getSpillCount(),
metrics.sortTime / M,
metrics.recordCount,
(timer.now() - timer.last()) / M,
recordCount,
outputSize,
realOutputSize,
filepath.c_str());
_mapOutputMaterializedBytes->increase(realOutputSize);
delete merger;

View File

@ -85,7 +85,11 @@ private:
ICombineRunner * _combineRunner;
Counter * _mapOutputRecords;
Counter * _mapOutputBytes;
Counter * _mapOutputMaterializedBytes;
Counter * _spilledRecords;
SpillOutputService * _spillOutput;
uint32_t _defaultBlockSize;
@ -118,7 +122,7 @@ public:
private:
void init(uint32_t maxBlockSize, uint32_t memory_capacity, ComparatorPtr keyComparator,
Counter * spilledRecord, ICombineRunner * combiner);
ICombineRunner * combiner);
void reset();
@ -149,7 +153,7 @@ private:
* normal spill use options in _config
* @param filepaths: spill file path
*/
void middleSpill(const std::string & spillOutput, const std::string & indexFilePath);
void middleSpill(const std::string & spillOutput, const std::string & indexFilePath, bool final);
/**
* final merge and/or spill use options in _config, and

View File

@ -131,7 +131,6 @@ bool Merger::next(Buffer & key, Buffer & value) {
}
void Merger::merge() {
Timer timer;
uint64_t total_record = 0;
_heap.reserve(_entries.size());
MergeEntryPtr * base = &(_heap[0]);
@ -153,29 +152,6 @@ void Merger::merge() {
}
endPartition();
}
uint64_t interval = (timer.now() - timer.last());
uint64_t M = 1000000; //1 million
uint64_t output_size;
uint64_t real_output_size;
_writer->getStatistics(output_size, real_output_size);
if (total_record != 0) {
LOG("[Merge] Merged segment#: %lu, record#: %"PRIu64", avg record size: %"PRIu64", uncompressed total bytes: %"PRIu64", compressed total bytes: %"PRIu64", time: %"PRIu64" ms",
_entries.size(),
total_record,
output_size / (total_record),
output_size,
real_output_size,
interval / M);
} else {
LOG("[Merge] Merged segments#, %lu, uncompressed total bytes: %"PRIu64", compressed total bytes: %"PRIu64", time: %"PRIu64" ms",
_entries.size(),
output_size,
real_output_size,
interval / M);
}
}
} // namespace NativeTask

View File

@ -117,8 +117,6 @@ public:
memBlock = new MemoryBlock(buff, allocated);
_memBlocks.push_back(memBlock);
return memBlock->allocateKVBuffer(kvLength);
} else {
LOG("MemoryPool is full, fail to allocate new MemBlock, block size: %d, kv length: %d", expect, kvLength);
}
}
return NULL;

View File

@ -22,22 +22,14 @@ namespace NativeTask {
#define DEFINE_COUNTER(name) const char * TaskCounters::name = #name;
const char * TaskCounters::TASK_COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter";
const char * TaskCounters::TASK_COUNTER_GROUP = "org.apache.hadoop.mapreduce.TaskCounter";
DEFINE_COUNTER(MAP_INPUT_RECORDS)
DEFINE_COUNTER(MAP_OUTPUT_RECORDS)
DEFINE_COUNTER(MAP_SKIPPED_RECORDS)
DEFINE_COUNTER(MAP_INPUT_BYTES)
DEFINE_COUNTER(MAP_OUTPUT_BYTES)
DEFINE_COUNTER(MAP_OUTPUT_MATERIALIZED_BYTES)
DEFINE_COUNTER(COMBINE_INPUT_RECORDS)
DEFINE_COUNTER(COMBINE_OUTPUT_RECORDS)
DEFINE_COUNTER(REDUCE_INPUT_GROUPS)
DEFINE_COUNTER(REDUCE_SHUFFLE_BYTES)
DEFINE_COUNTER(REDUCE_INPUT_RECORDS)
DEFINE_COUNTER(REDUCE_OUTPUT_RECORDS)
DEFINE_COUNTER(REDUCE_SKIPPED_GROUPS)
DEFINE_COUNTER(REDUCE_SKIPPED_RECORDS)
DEFINE_COUNTER(SPILLED_RECORDS)
const char * TaskCounters::FILESYSTEM_COUNTER_GROUP = "FileSystemCounters";

View File

@ -27,18 +27,10 @@ public:
static const char * MAP_INPUT_RECORDS;
static const char * MAP_OUTPUT_RECORDS;
static const char * MAP_SKIPPED_RECORDS;
static const char * MAP_INPUT_BYTES;
static const char * MAP_OUTPUT_BYTES;
static const char * MAP_OUTPUT_MATERIALIZED_BYTES;
static const char * COMBINE_INPUT_RECORDS;
static const char * COMBINE_OUTPUT_RECORDS;
static const char * REDUCE_INPUT_GROUPS;
static const char * REDUCE_SHUFFLE_BYTES;
static const char * REDUCE_INPUT_RECORDS;
static const char * REDUCE_OUTPUT_RECORDS;
static const char * REDUCE_SKIPPED_GROUPS;
static const char * REDUCE_SKIPPED_RECORDS;
static const char * SPILLED_RECORDS;
static const char * FILESYSTEM_COUNTER_GROUP;

View File

@ -63,15 +63,9 @@ public void testWordCountCombiner() throws Exception {
final Job normaljob = getJob("normalwordcount", commonConf, inputpath, hadoopoutputpath);
assertTrue(nativejob.waitForCompletion(true));
Counter nativeReduceGroups = nativejob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
assertTrue(normaljob.waitForCompletion(true));
Counter normalReduceGroups = normaljob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
assertEquals(true, ResultVerifier.verify(nativeoutputpath, hadoopoutputpath));
assertEquals("Native Reduce reduce group counter should equal orignal reduce group counter",
nativeReduceGroups.getValue(), normalReduceGroups.getValue());
ResultVerifier.verifyCounters(normaljob, nativejob, true);
}
@Before

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.junit.AfterClass;
import org.apache.hadoop.util.NativeCodeLoader;
import org.junit.Assume;
@ -87,10 +88,8 @@ public void testLargeValueCombiner() throws Exception {
final Job nativejob = CombinerTest.getJob("nativewordcount", nativeConf, inputPath, nativeOutputPath);
assertTrue(nativejob.waitForCompletion(true));
Counter nativeReduceGroups = nativejob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
assertTrue(normaljob.waitForCompletion(true));
Counter normalReduceGroups = normaljob.getCounters().findCounter(Task.Counter.REDUCE_INPUT_RECORDS);
final boolean compareRet = ResultVerifier.verify(nativeOutputPath, hadoopOutputPath);
@ -98,8 +97,7 @@ public void testLargeValueCombiner() throws Exception {
+ ", max size: " + max + ", normal out: " + hadoopOutputPath + ", native Out: " + nativeOutputPath;
assertEquals(reason, true, compareRet);
// assertEquals("Native Reduce reduce group counter should equal orignal reduce group counter",
// nativeReduceGroups.getValue(), normalReduceGroups.getValue());
ResultVerifier.verifyCounters(normaljob, nativejob, true);
}
fs.close();
}

View File

@ -69,6 +69,7 @@ public void testSnappyCompress() throws Exception {
final boolean compareRet = ResultVerifier.verify(nativeOutputPath, hadoopOutputPath);
assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
ResultVerifier.verifyCounters(hadoopjob, job);
}
@Test
@ -91,6 +92,7 @@ public void testGzipCompress() throws Exception {
final boolean compareRet = ResultVerifier.verify(nativeOutputPath, hadoopOutputPath);
assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
ResultVerifier.verifyCounters(hadoopjob, job);
}
@Test
@ -112,6 +114,7 @@ public void testLz4Compress() throws Exception {
assertTrue(hadoopJob.waitForCompletion(true));
final boolean compareRet = ResultVerifier.verify(nativeOutputPath, hadoopOutputPath);
assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
ResultVerifier.verifyCounters(hadoopJob, nativeJob);
}
@Before

View File

@ -114,21 +114,34 @@ public void startUp() throws Exception {
@Test
public void testKVCompability() throws Exception {
final String nativeoutput = this.runNativeTest(
"Test:" + keyclass.getSimpleName() + "--" + valueclass.getSimpleName(), keyclass, valueclass);
final String normaloutput = this.runNormalTest(
"Test:" + keyclass.getSimpleName() + "--" + valueclass.getSimpleName(), keyclass, valueclass);
final boolean compareRet = ResultVerifier.verify(normaloutput, nativeoutput);
final String input = TestConstants.NATIVETASK_KVTEST_INPUTDIR + "/"
+ keyclass.getName() + "/" + valueclass.getName();
if(compareRet){
final FileSystem fs = FileSystem.get(hadoopkvtestconf);
fs.delete(new Path(nativeoutput), true);
fs.delete(new Path(normaloutput), true);
fs.delete(new Path(input), true);
fs.close();
}
assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
final FileSystem fs = FileSystem.get(nativekvtestconf);
final String jobName = "Test:" + keyclass.getSimpleName() + "--"
+ valueclass.getSimpleName();
final String inputPath = TestConstants.NATIVETASK_KVTEST_INPUTDIR + "/"
+ keyclass.getName() + "/" + valueclass.getName();
final String nativeOutputPath = TestConstants.NATIVETASK_KVTEST_NATIVE_OUTPUTDIR
+ "/" + keyclass.getName() + "/" + valueclass.getName();
// if output file exists ,then delete it
fs.delete(new Path(nativeOutputPath), true);
nativekvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "true");
final KVJob nativeJob = new KVJob(jobName, nativekvtestconf, keyclass,
valueclass, inputPath, nativeOutputPath);
assertTrue("job should complete successfully", nativeJob.runJob());
final String normalOutputPath = TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR
+ "/" + keyclass.getName() + "/" + valueclass.getName();
// if output file exists ,then delete it
fs.delete(new Path(normalOutputPath), true);
hadoopkvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "false");
final KVJob normalJob = new KVJob(jobName, hadoopkvtestconf, keyclass,
valueclass, inputPath, normalOutputPath);
assertTrue("job should complete successfully", normalJob.runJob());
final boolean compareRet = ResultVerifier.verify(normalOutputPath,
nativeOutputPath);
assertEquals("job output not the same", true, compareRet);
ResultVerifier.verifyCounters(normalJob.job, nativeJob.job);
fs.close();
}
@AfterClass
@ -137,35 +150,4 @@ public static void cleanUp() throws IOException {
fs.delete(new Path(TestConstants.NATIVETASK_KVTEST_DIR), true);
fs.close();
}
private String runNativeTest(String jobname, Class<?> keyclass, Class<?> valueclass) throws Exception {
final String inputpath = TestConstants.NATIVETASK_KVTEST_INPUTDIR + "/"
+ keyclass.getName() + "/" + valueclass.getName();
final String outputpath = TestConstants.NATIVETASK_KVTEST_NATIVE_OUTPUTDIR + "/"
+ keyclass.getName() + "/" + valueclass.getName();
// if output file exists ,then delete it
final FileSystem fs = FileSystem.get(nativekvtestconf);
fs.delete(new Path(outputpath));
fs.close();
nativekvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "true");
final KVJob keyJob = new KVJob(jobname, nativekvtestconf, keyclass, valueclass, inputpath, outputpath);
assertTrue("job should complete successfully", keyJob.runJob());
return outputpath;
}
private String runNormalTest(String jobname, Class<?> keyclass, Class<?> valueclass) throws Exception {
final String inputpath = TestConstants.NATIVETASK_KVTEST_INPUTDIR + "/"
+ keyclass.getName() + "/" + valueclass.getName();
final String outputpath = TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR + "/"
+ keyclass.getName() + "/" + valueclass.getName();
// if output file exists ,then delete it
final FileSystem fs = FileSystem.get(hadoopkvtestconf);
fs.delete(new Path(outputpath));
fs.close();
hadoopkvtestconf.set(TestConstants.NATIVETASK_KVTEST_CREATEFILE, "false");
final KVJob keyJob = new KVJob(jobname, hadoopkvtestconf, keyclass, valueclass, inputpath, outputpath);
assertTrue("job should complete successfully", keyJob.runJob());
return outputpath;
}
}

View File

@ -78,11 +78,13 @@ public void runKVSizeTests(Class<?> keyClass, Class<?> valueClass) throws Except
if (!keyClass.equals(Text.class) && !valueClass.equals(Text.class)) {
return;
}
final int deafult_KVSize_Maximum = 1 << 22; // 4M
final int KVSize_Maximu = normalConf.getInt(TestConstants.NATIVETASK_KVSIZE_MAX_LARGEKV_TEST,
deafult_KVSize_Maximum);
final int deafultKVSizeMaximum = 1 << 22; // 4M
final int kvSizeMaximum = normalConf.getInt(
TestConstants.NATIVETASK_KVSIZE_MAX_LARGEKV_TEST,
deafultKVSizeMaximum);
final FileSystem fs = FileSystem.get(normalConf);
for (int i = 65536; i <= KVSize_Maximu; i *= 4) {
for (int i = 65536; i <= kvSizeMaximum; i *= 4) {
int min = i / 4;
int max = i;
nativeConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min));
@ -90,48 +92,40 @@ public void runKVSizeTests(Class<?> keyClass, Class<?> valueClass) throws Except
normalConf.set(TestConstants.NATIVETASK_KVSIZE_MIN, String.valueOf(min));
normalConf.set(TestConstants.NATIVETASK_KVSIZE_MAX, String.valueOf(max));
LOG.info("===KV Size Test: min size: " + min + ", max size: " + max + ", keyClass: "
+ keyClass.getName() + ", valueClass: " + valueClass.getName());
LOG.info("===KV Size Test: min size: " + min + ", max size: " + max
+ ", keyClass: " + keyClass.getName() + ", valueClass: "
+ valueClass.getName());
final String nativeOutPut = runNativeLargeKVTest("Test Large Value Size:" + String.valueOf(i), keyClass,
valueClass, nativeConf);
final String normalOutPut = this.runNormalLargeKVTest("Test Large Key Size:" + String.valueOf(i), keyClass,
valueClass, normalConf);
final boolean compareRet = ResultVerifier.verify(normalOutPut, nativeOutPut);
final String reason = "keytype: " + keyClass.getName() + ", valuetype: " + valueClass.getName()
+ ", failed with " + (keyClass.equals(Text.class) ? "key" : "value") + ", min size: " + min
+ ", max size: " + max + ", normal out: " + normalOutPut + ", native Out: " + nativeOutPut;
final String inputPath = TestConstants.NATIVETASK_KVTEST_INPUTDIR
+ "/LargeKV/" + keyClass.getName() + "/" + valueClass.getName();
final String nativeOutputPath = TestConstants.NATIVETASK_KVTEST_NATIVE_OUTPUTDIR
+ "/LargeKV/" + keyClass.getName() + "/" + valueClass.getName();
// if output file exists ,then delete it
fs.delete(new Path(nativeOutputPath), true);
final KVJob nativeJob = new KVJob("Test Large Value Size:"
+ String.valueOf(i), nativeConf, keyClass, valueClass, inputPath,
nativeOutputPath);
assertTrue("job should complete successfully", nativeJob.runJob());
final String normalOutputPath = TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR
+ "/LargeKV/" + keyClass.getName() + "/" + valueClass.getName();
// if output file exists ,then delete it
fs.delete(new Path(normalOutputPath), true);
final KVJob normalJob = new KVJob("Test Large Key Size:" + String.valueOf(i),
normalConf, keyClass, valueClass, inputPath, normalOutputPath);
assertTrue("job should complete successfully", normalJob.runJob());
final boolean compareRet = ResultVerifier.verify(normalOutputPath,
nativeOutputPath);
final String reason = "keytype: " + keyClass.getName() + ", valuetype: "
+ valueClass.getName() + ", failed with "
+ (keyClass.equals(Text.class) ? "key" : "value") + ", min size: " + min
+ ", max size: " + max + ", normal out: " + normalOutputPath
+ ", native Out: " + nativeOutputPath;
assertEquals(reason, true, compareRet);
ResultVerifier.verifyCounters(normalJob.job, nativeJob.job);
}
}
private String runNativeLargeKVTest(String jobname, Class<?> keyclass, Class<?> valueclass, Configuration conf)
throws Exception {
final String inputpath = TestConstants.NATIVETASK_KVTEST_INPUTDIR + "/LargeKV/"
+ keyclass.getName() + "/" + valueclass.getName();
final String outputpath = TestConstants.NATIVETASK_KVTEST_NATIVE_OUTPUTDIR + "/LargeKV/"
+ keyclass.getName() + "/" + valueclass.getName();
// if output file exists ,then delete it
final FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(outputpath), true);
fs.close();
final KVJob keyJob = new KVJob(jobname, conf, keyclass, valueclass, inputpath, outputpath);
assertTrue("job should complete successfully", keyJob.runJob());
return outputpath;
}
private String runNormalLargeKVTest(String jobname, Class<?> keyclass, Class<?> valueclass, Configuration conf)
throws Exception {
final String inputpath = TestConstants.NATIVETASK_KVTEST_INPUTDIR + "/LargeKV/"
+ keyclass.getName() + "/" + valueclass.getName();
final String outputpath = TestConstants.NATIVETASK_KVTEST_NORMAL_OUTPUTDIR + "/LargeKV/"
+ keyclass.getName() + "/" + valueclass.getName();
// if output file exists ,then delete it
final FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(outputpath), true);
fs.close();
final KVJob keyJob = new KVJob(jobname, conf, keyclass, valueclass, inputpath, outputpath);
assertTrue("job should complete successfully", keyJob.runJob());
return outputpath;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapred.nativetask.nonsorttest;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@ -53,19 +54,20 @@ public void nonSortTest() throws Exception {
final Job nativeNonSort = getJob(nativeConf, "NativeNonSort",
TestConstants.NATIVETASK_NONSORT_TEST_INPUTDIR,
TestConstants.NATIVETASK_NONSORT_TEST_NATIVE_OUTPUT);
nativeNonSort.waitForCompletion(true);
assertTrue(nativeNonSort.waitForCompletion(true));
Configuration normalConf = ScenarioConfiguration.getNormalConfiguration();
normalConf.addResource(TestConstants.NONSORT_TEST_CONF);
final Job hadoopWithSort = getJob(normalConf, "NormalJob",
TestConstants.NATIVETASK_NONSORT_TEST_INPUTDIR,
TestConstants.NATIVETASK_NONSORT_TEST_NORMAL_OUTPUT);
hadoopWithSort.waitForCompletion(true);
assertTrue(hadoopWithSort.waitForCompletion(true));
final boolean compareRet = ResultVerifier.verify(
TestConstants.NATIVETASK_NONSORT_TEST_NATIVE_OUTPUT,
TestConstants.NATIVETASK_NONSORT_TEST_NORMAL_OUTPUT);
assertEquals("file compare result: if they are the same ,then return true", true, compareRet);
ResultVerifier.verifyCounters(hadoopWithSort, nativeNonSort);
}
@Before

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.mapred.nativetask.testutil;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.zip.CRC32;
@ -25,6 +27,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskCounter;
public class ResultVerifier {
/**
@ -136,6 +141,23 @@ public static boolean verify(String sample, String source) throws Exception {
return true;
}
public static void main(String[] args) {
public static void verifyCounters(Job normalJob, Job nativeJob, boolean hasCombiner) throws IOException {
Counters normalCounters = normalJob.getCounters();
Counters nativeCounters = nativeJob.getCounters();
assertEquals("Counter MAP_OUTPUT_RECORDS should be equal",
normalCounters.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue(),
nativeCounters.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getValue());
assertEquals("Counter REDUCE_INPUT_GROUPS should be equal",
normalCounters.findCounter(TaskCounter.REDUCE_INPUT_GROUPS).getValue(),
nativeCounters.findCounter(TaskCounter.REDUCE_INPUT_GROUPS).getValue());
if (!hasCombiner) {
assertEquals("Counter REDUCE_INPUT_RECORDS should be equal",
normalCounters.findCounter(TaskCounter.REDUCE_INPUT_RECORDS).getValue(),
nativeCounters.findCounter(TaskCounter.REDUCE_INPUT_RECORDS).getValue());
}
}
public static void verifyCounters(Job normalJob, Job nativeJob) throws IOException {
verifyCounters(normalJob, nativeJob, false);
}
}