MAPREDUCE-6069. Lint/style fixes and removal of unused code. Contributed by Todd Lipcon.

This commit is contained in:
Todd Lipcon 2014-09-03 13:07:24 -07:00
parent 00322161b5
commit 683987be7c
136 changed files with 547 additions and 2336 deletions

View File

@ -21,3 +21,4 @@ MAPREDUCE-6058. native-task: KVTest and LargeKVTest should check mr job is suces
MAPREDUCE-6056. native-task: move system test working dir to target dir and cleanup test config xml files (Manu Zhang via bchang)
MAPREDUCE-6055. native-task: findbugs, interface annotations, and other misc cleanup (todd)
MAPREDUCE-6067. native-task: fix some counter issues (Binglin Chang)
MAPREDUCE-6069. native-task: Lint/style fixes and removal of unused code (todd)

View File

@ -204,28 +204,6 @@
Licenses for third party projects used by this project:
CityHash src/main/native/cityhash
---------------------------------------------------------------------
// Copyright (c) 2011 Google, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
GoogleTest src/main/native/gtest
---------------------------------------------------------------------
Copyright 2008, Google Inc.

View File

@ -178,7 +178,6 @@ configure_file(main/native/test.sh test/test.sh)
add_dual_library(nativetask
${CMAKE_BINARY_DIR}/lz4.c
${D}/cityhash/city.cc
${D}/src/codec/BlockCodec.cc
${D}/src/codec/GzipCodec.cc
${D}/src/codec/Lz4Codec.cc
@ -208,10 +207,8 @@ add_dual_library(nativetask
${D}/src/lib/SpillInfo.cc
${D}/src/lib/Path.cc
${D}/src/lib/Streams.cc
${D}/src/lib/Combiner.cc
${D}/src/lib/TaskCounters.cc
${D}/src/util/Checksum.cc
${D}/src/util/Hash.cc
${D}/src/util/Random.cc
${D}/src/util/StringUtil.cc
${D}/src/util/SyncUtils.cc
@ -237,11 +234,8 @@ add_executable(nttest
${D}/test/lib/TestPartitionBucket.cc
${D}/test/lib/TestReadBuffer.cc
${D}/test/lib/TestReadWriteBuffer.cc
${D}/test/lib/TestTrackingCollector.cc
${D}/test/util/TestChecksum.cc
${D}/test/util/TestHash.cc
${D}/test/util/TestStringUtil.cc
${D}/test/util/TestSyncUtils.cc
${D}/test/util/TestWritableUtils.cc
${D}/test/TestCommand.cc
${D}/test/TestConfig.cc

View File

@ -25,22 +25,9 @@ public class Constants {
public static final String MAP_SORT_CLASS = "map.sort.class";
public static final String MAPRED_COMBINER_CLASS = "mapred.combiner.class";
public static final String MAPRED_MAPTASK_DELEGATOR_CLASS = "mapreduce.map.task.delegator.class";
public static final String MAPRED_REDUCETASK_DELEGATOR_CLASS = "mapreduce.reduce.task.delegator.class";
public static final String NATIVE_TASK_ENABLED = "native.task.enabled";
public static final String NATIVE_LOG_DEVICE = "native.log.device";
public static final String NATIVE_HADOOP_VERSION = "native.hadoop.version";
public static final String NATIVE_MAPPER_CLASS = "native.mapper.class";
public static final String NATIVE_REDUCER_CLASS = "native.reducer.class";
public static final String NATIVE_PARTITIONER_CLASS = "native.partitioner.class";
public static final String NATIVE_COMBINER_CLASS = "native.combiner.class";
public static final String NATIVE_INPUT_SPLIT = "native.input.split";
public static final String NATIVE_RECORDREADER_CLASS = "native.recordreader.class";
public static final String NATIVE_RECORDWRITER_CLASS = "native.recordwriter.class";
public static final String NATIVE_OUTPUT_FILE_NAME = "native.output.file.name";
public static final String NATIVE_PROCESSOR_BUFFER_KB = "native.processor.buffer.kb";
public static final int NATIVE_PROCESSOR_BUFFER_KB_DEFAULT = 64;
public static final int NATIVE_ASYNC_PROCESSOR_BUFFER_KB_DEFAULT = 1024;

View File

@ -32,9 +32,6 @@ public interface DataReceiver {
/**
* Send a signal to the receiver that the data arrives.
* The data is transferred in another band.
*
* @return
* @throws IOException
*/
public boolean receiveData() throws IOException;
}

View File

@ -29,7 +29,6 @@ public interface ICombineHandler {
/**
* run combiner
* @throws IOException
*/
public void combine() throws IOException;
@ -40,7 +39,6 @@ public interface ICombineHandler {
/**
* close handlers, buffer pullers and pushers
* @throws IOException
*/
public void close() throws IOException;
}

View File

@ -45,17 +45,9 @@ public interface INativeHandler extends NativeDataTarget, NativeDataSource {
/**
* call command to downstream
*
* @param command
* @param parameter
* @return
* @throws IOException
*/
public ReadWriteBuffer call(Command command, ReadWriteBuffer parameter) throws IOException;
/**
* @param handler
*/
void setCommandDispatcher(CommandDispatcher handler);
}

View File

@ -229,32 +229,21 @@ private void flushOutput(int length) throws IOException {
/**
* Let native side to process data in inputBuffer
*
* @param handler
* @param length
*/
private native void nativeProcessInput(long handler, int length);
/**
* Notice native side input is finished
*
* @param handler
*/
private native void nativeFinish(long handler);
/**
* Send control message to native side
*
* @param cmd
* command data
* @return return value
*/
private native byte[] nativeCommand(long handler, int cmd, byte[] parameter);
/**
* Load data from native
*
* @return
*/
private native void nativeLoadData(long handler);

View File

@ -95,7 +95,8 @@ public void init(Context context) throws IOException, ClassNotFoundException {
if (!QuickSort.class.getName().equals(job.get(Constants.MAP_SORT_CLASS))) {
String message = "Native-Task doesn't support sort class " + job.get(Constants.MAP_SORT_CLASS);
String message = "Native-Task doesn't support sort class " +
job.get(Constants.MAP_SORT_CLASS);
LOG.error(message);
throw new InvalidJobConfException(message);
}
@ -115,8 +116,8 @@ public void init(Context context) throws IOException, ClassNotFoundException {
LOG.error(message);
throw new InvalidJobConfException(message);
} else if (!Platforms.support(keyCls.getName(), serializer, job)) {
String message = "Native output collector doesn't support this key, this key is not comparable in native "
+ keyCls.getName();
String message = "Native output collector doesn't support this key, " +
"this key is not comparable in native: " + keyCls.getName();
LOG.error(message);
throw new InvalidJobConfException(message);
}
@ -144,7 +145,8 @@ public void init(Context context) throws IOException, ClassNotFoundException {
updater.start();
} else {
String message = "Nativeruntime cannot be loaded, please check the libnativetask.so is in hadoop library dir";
String message = "NativeRuntime cannot be loaded, please check that " +
"libnativetask.so is in hadoop library dir";
LOG.error(message);
throw new InvalidJobConfException(message);
}

View File

@ -79,9 +79,6 @@ public static void configure(Configuration jobConf) {
/**
* create native object We use it to create native handlers
*
* @param clazz
* @return
*/
public synchronized static long createNativeObject(String clazz) {
assertNativeLibraryLoaded();
@ -94,13 +91,11 @@ public synchronized static long createNativeObject(String clazz) {
/**
* Register a customized library
*
* @param clazz
* @return
*/
public synchronized static long registerLibrary(String libraryName, String clazz) {
assertNativeLibraryLoaded();
final long ret = JNIRegisterModule(libraryName.getBytes(Charsets.UTF_8), clazz.getBytes(Charsets.UTF_8));
final long ret = JNIRegisterModule(libraryName.getBytes(Charsets.UTF_8),
clazz.getBytes(Charsets.UTF_8));
if (ret != 0) {
LOG.warn("Can't create NativeObject for class " + clazz + ", probably not exist.");
}
@ -117,9 +112,6 @@ public synchronized static void releaseNativeObject(long addr) {
/**
* Get the status report from native space
*
* @param reporter
* @throws IOException
*/
public static void reportStatus(TaskReporter reporter) throws IOException {
assertNativeLibraryLoaded();
@ -164,40 +156,32 @@ public static void reportStatus(TaskReporter reporter) throws IOException {
/**
* Config the native runtime with mapreduce job configurations.
*
* @param configs
*/
private native static void JNIConfigure(byte[][] configs);
/**
* create a native object in native space
*
* @param clazz
* @return
*/
private native static long JNICreateNativeObject(byte[] clazz);
/**
* create the default native object for certain type
*
* @param type
* @return
*/
@Deprecated
private native static long JNICreateDefaultNativeObject(byte[] type);
/**
* destroy native object in native space
*
* @param addr
*/
private native static void JNIReleaseNativeObject(long addr);
/**
* get status update from native side Encoding: progress:float status:Text Counter number: int the count of the
* counters Counters: array [group:Text, name:Text, incrCount:Long]
*
* @return
* Get status update from native side
* Encoding:
* progress:float
* status:Text
* number: int the count of the counters
* Counters: array [group:Text, name:Text, incrCount:Long]
*/
private native static byte[] JNIUpdateStatus();

View File

@ -49,8 +49,6 @@ public Platform() {
/**
* initialize a platform, where we should call registerKey
*
* @throws IOException
*/
public abstract void init() throws IOException;
@ -65,7 +63,6 @@ public Platform() {
*
* @param keyClassName map out key class name
* @param key key serializer class
* @throws IOException
*/
protected void registerKey(String keyClassName, Class<?> key) throws IOException {
serialization.register(keyClassName, key);
@ -92,12 +89,12 @@ protected abstract boolean support(String keyClassName,
/**
* whether it's the platform that has defined a custom Java comparator
*
* NativeTask doesn't support custom Java comparator(set with mapreduce.job.output.key.comparator.class)
* but a platform (e.g Pig) could also set that conf and implement native comparators so
* we shouldn't bail out.
* NativeTask doesn't support custom Java comparators
* (set with mapreduce.job.output.key.comparator.class)
* but a platform (e.g Pig) could also set that conf and implement native
* comparators so we shouldn't bail out.
*
* @param keyComparator comparator set with mapreduce.job.output.key.comparator.class
* @return
*/
protected abstract boolean define(Class<?> keyComparator);
}

View File

@ -46,7 +46,8 @@ public class BufferPullee<IK, IV> implements IDataLoader {
private final NativeDataTarget target;
private boolean closed = false;
public BufferPullee(Class<IK> iKClass, Class<IV> iVClass, RawKeyValueIterator rIter, NativeDataTarget target)
public BufferPullee(Class<IK> iKClass, Class<IV> iVClass,
RawKeyValueIterator rIter, NativeDataTarget target)
throws IOException {
this.rIter = rIter;
tmpInputKey = new SizedWritable<IK>(iKClass);

View File

@ -189,8 +189,6 @@ public Progress getProgress() {
/**
* Closes the iterator so that the underlying streams can be closed.
*
* @throws IOException
*/
@Override
public void close() throws IOException {

View File

@ -51,7 +51,8 @@ public class BufferPushee<OK, OV> implements Closeable {
private KVSerializer<OK, OV> deserializer;
private boolean closed = false;
public BufferPushee(Class<OK> oKClass, Class<OV> oVClass, RecordWriter<OK, OV> writer) throws IOException {
public BufferPushee(Class<OK> oKClass, Class<OV> oVClass,
RecordWriter<OK, OV> writer) throws IOException {
tmpOutputKey = new SizedWritable<OK>(oKClass);
tmpOutputValue = new SizedWritable<OV>(oVClass);
@ -128,7 +129,8 @@ private boolean write(InputBuffer input) throws IOException {
}
}
if (remain != totalRead) {
throw new IOException("We expect to read " + remain + ", but we actually read: " + totalRead);
throw new IOException("We expect to read " + remain +
", but we actually read: " + totalRead);
}
return true;
}

View File

@ -44,7 +44,8 @@ public class BufferPusher<K, V> implements OutputCollector<K, V> {
IKVSerializer serializer;
private boolean closed = false;
public BufferPusher(Class<K> iKClass, Class<V> iVClass, NativeDataTarget target) throws IOException {
public BufferPusher(Class<K> iKClass, Class<V> iVClass,
NativeDataTarget target) throws IOException {
tmpInputKey = new SizedWritable<K>(iKClass);
tmpInputValue = new SizedWritable<V>(iVClass);

View File

@ -49,7 +49,8 @@ class CombinerHandler<K, V> implements ICombineHandler, CommandDispatcher {
private final BufferPusher<K, V> kvPusher;
private boolean closed = false;
public static <K, V> ICombineHandler create(TaskContext context) throws IOException, ClassNotFoundException {
public static <K, V> ICombineHandler create(TaskContext context)
throws IOException, ClassNotFoundException {
final JobConf conf = new JobConf(context.getConf());
conf.set(Constants.SERIALIZATION_FRAMEWORK,
String.valueOf(SerializationFramework.WRITABLE_SERIALIZATION.getType()));
@ -66,11 +67,13 @@ public static <K, V> ICombineHandler create(TaskContext context) throws IOExcept
final Counter combineInputCounter = context.getTaskReporter().getCounter(
TaskCounter.COMBINE_INPUT_RECORDS);
final CombinerRunner<K, V> combinerRunner = CombinerRunner.create(conf, context.getTaskAttemptId(),
final CombinerRunner<K, V> combinerRunner = CombinerRunner.create(
conf, context.getTaskAttemptId(),
combineInputCounter, context.getTaskReporter(), null);
final INativeHandler nativeHandler = NativeBatchProcessor.create(NAME, conf, DataChannel.INOUT);
final INativeHandler nativeHandler = NativeBatchProcessor.create(
NAME, conf, DataChannel.INOUT);
@SuppressWarnings("unchecked")
final BufferPusher<K, V> pusher = new BufferPusher<K, V>((Class<K>)context.getInputKeyClass(),
(Class<V>)context.getInputValueClass(),
@ -79,8 +82,9 @@ public static <K, V> ICombineHandler create(TaskContext context) throws IOExcept
return new CombinerHandler<K, V>(nativeHandler, combinerRunner, puller, pusher);
}
public CombinerHandler(INativeHandler nativeHandler, CombinerRunner<K, V> combiner, BufferPuller puller,
BufferPusher<K, V> kvPusher) throws IOException {
public CombinerHandler(INativeHandler nativeHandler, CombinerRunner<K, V> combiner,
BufferPuller puller, BufferPusher<K, V> kvPusher)
throws IOException {
this.nativeHandler = nativeHandler;
this.combinerRunner = combiner;
this.puller = puller;

View File

@ -30,7 +30,6 @@ public interface IDataLoader {
/**
* @return size of data loaded
* @throws IOException
*/
public int load() throws IOException;

View File

@ -63,7 +63,8 @@ public class NativeCollectorOnlyHandler<K, V> implements CommandDispatcher, Clos
private final INativeHandler nativeHandler;
private boolean closed = false;
public static <K, V> NativeCollectorOnlyHandler<K, V> create(TaskContext context) throws IOException {
public static <K, V> NativeCollectorOnlyHandler<K, V> create(TaskContext context)
throws IOException {
ICombineHandler combinerHandler = null;
@ -81,7 +82,8 @@ public static <K, V> NativeCollectorOnlyHandler<K, V> create(TaskContext context
LOG.info("[NativeCollectorOnlyHandler] combiner is not null");
}
final INativeHandler nativeHandler = NativeBatchProcessor.create(NAME, context.getConf(), DataChannel.OUT);
final INativeHandler nativeHandler = NativeBatchProcessor.create(
NAME, context.getConf(), DataChannel.OUT);
final BufferPusher<K, V> kvPusher = new BufferPusher<K, V>(
(Class<K>)context.getOutputKeyClass(),
(Class<V>)context.getOutputValueClass(),

View File

@ -27,7 +27,8 @@
import org.apache.hadoop.mapred.nativetask.INativeComparable;
@InterfaceAudience.Private
public class BytesWritableSerializer implements INativeComparable, INativeSerializer<BytesWritable> {
public class BytesWritableSerializer
implements INativeComparable, INativeSerializer<BytesWritable> {
@Override
public int getLength(BytesWritable w) throws IOException {

View File

@ -33,43 +33,16 @@ public interface IKVSerializer {
/**
* update the length field of SizedWritable
* @param key
* @param value
* @throws IOException
*/
public void updateLength(SizedWritable<?> key, SizedWritable<?> value) throws IOException;
/**
*
* @param out
* @param key
* @param value
* @return bytes written
* @throws IOException
*/
public int serializeKV(DataOutputStream out, SizedWritable<?> key,
SizedWritable<?> value) throws IOException;
/**
* serialize partitionId as well
* @param out
* @param partitionId
* @param key
* @param value
* @return
* @throws IOException
*/
public int serializePartitionKV(DataOutputStream out, int partitionId,
SizedWritable<?> key, SizedWritable<?> value)
throws IOException;
/**
*
* @param in
* @param key
* @param value
* @return bytes read
* @throws IOException
*/
public int deserializeKV(DataInputStream in, SizedWritable<?> key, SizedWritable<?> value) throws IOException;
public int deserializeKV(DataInputStream in, SizedWritable<?> key, SizedWritable<?> value)
throws IOException;
}

View File

@ -55,7 +55,8 @@ public void updateLength(SizedWritable<?> key, SizedWritable<?> value) throws IO
}
@Override
public int serializeKV(DataOutputStream out, SizedWritable<?> key, SizedWritable<?> value) throws IOException {
public int serializeKV(DataOutputStream out, SizedWritable<?> key, SizedWritable<?> value)
throws IOException {
return serializePartitionKV(out, -1, key, value);
}
@ -64,7 +65,8 @@ public int serializePartitionKV(DataOutputStream out, int partitionId,
SizedWritable<?> key, SizedWritable<?> value)
throws IOException {
if (key.length == SizedWritable.INVALID_LENGTH || value.length == SizedWritable.INVALID_LENGTH) {
if (key.length == SizedWritable.INVALID_LENGTH ||
value.length == SizedWritable.INVALID_LENGTH) {
updateLength(key, value);
}

View File

@ -27,7 +27,8 @@
@InterfaceAudience.Private
public class NativeSerialization {
private final ConcurrentHashMap<String, Class<?>> map = new ConcurrentHashMap<String, Class<?>>();
private final ConcurrentHashMap<String, Class<?>> map =
new ConcurrentHashMap<String, Class<?>>();
public boolean accept(Class<?> c) {
return Writable.class.isAssignableFrom(c);
@ -40,7 +41,8 @@ public INativeSerializer<Writable> getSerializer(Class<?> c) throws IOException
return null;
}
if (!Writable.class.isAssignableFrom(c)) {
throw new IOException("Cannot serialize type " + c.getName() + ", we only accept subclass of Writable");
throw new IOException("Cannot serialize type " + c.getName() +
", we only accept subclass of Writable");
}
final String name = c.getName();
final Class<?> serializer = map.get(name);
@ -70,8 +72,9 @@ public void register(String klass, Class<?> serializer) throws IOException {
return;
} else {
if (!storedSerializer.getName().equals(serializer.getName())) {
throw new IOException("Error! Serializer already registered, exist: " + storedSerializer.getName() + ", new: "
+ serializer.getName());
throw new IOException("Error! Serializer already registered, existing: " +
storedSerializer.getName() + ", new: " +
serializer.getName());
}
}
}

View File

@ -97,7 +97,7 @@ public static double toDouble(final byte [] bytes, final int offset) {
* Write a printable representation of a byte array.
*
* @param b byte array
* @return string
* @return the printable presentation
* @see #toStringBinary(byte[], int, int)
*/
public static String toStringBinary(final byte [] b) {

View File

@ -45,9 +45,6 @@ public LocalJobOutputFiles(Configuration conf, String id) {
/**
* Return the path to local map output file created earlier
*
* @return path
* @throws IOException
*/
public Path getOutputFile() throws IOException {
String path = String.format(OUTPUT_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT);
@ -57,10 +54,7 @@ public Path getOutputFile() throws IOException {
/**
* Create a local map output file name.
*
* @param size
* the size of the file
* @return path
* @throws IOException
* @param size the size of the file
*/
public Path getOutputFileForWrite(long size) throws IOException {
String path = String.format(OUTPUT_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT);
@ -69,9 +63,6 @@ public Path getOutputFileForWrite(long size) throws IOException {
/**
* Return the path to a local map output index file created earlier
*
* @return path
* @throws IOException
*/
public Path getOutputIndexFile() throws IOException {
String path = String.format(OUTPUT_FILE_INDEX_FORMAT_STRING, TASKTRACKER_OUTPUT);
@ -81,10 +72,7 @@ public Path getOutputIndexFile() throws IOException {
/**
* Create a local map output index file name.
*
* @param size
* the size of the file
* @return path
* @throws IOException
* @param size the size of the file
*/
public Path getOutputIndexFileForWrite(long size) throws IOException {
String path = String.format(OUTPUT_FILE_INDEX_FORMAT_STRING, TASKTRACKER_OUTPUT);
@ -94,10 +82,7 @@ public Path getOutputIndexFileForWrite(long size) throws IOException {
/**
* Return a local map spill file created earlier.
*
* @param spillNumber
* the number
* @return path
* @throws IOException
* @param spillNumber the number
*/
public Path getSpillFile(int spillNumber) throws IOException {
String path = String.format(SPILL_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT, spillNumber);
@ -107,12 +92,8 @@ public Path getSpillFile(int spillNumber) throws IOException {
/**
* Create a local map spill file name.
*
* @param spillNumber
* the number
* @param size
* the size of the file
* @return path
* @throws IOException
* @param spillNumber the number
* @param size the size of the file
*/
public Path getSpillFileForWrite(int spillNumber, long size) throws IOException {
String path = String.format(SPILL_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT, spillNumber);
@ -122,10 +103,7 @@ public Path getSpillFileForWrite(int spillNumber, long size) throws IOException
/**
* Return a local map spill index file created earlier
*
* @param spillNumber
* the number
* @return path
* @throws IOException
* @param spillNumber the number
*/
public Path getSpillIndexFile(int spillNumber) throws IOException {
String path = String
@ -136,12 +114,8 @@ public Path getSpillIndexFile(int spillNumber) throws IOException {
/**
* Create a local map spill index file name.
*
* @param spillNumber
* the number
* @param size
* the size of the file
* @return path
* @throws IOException
* @param spillNumber the number
* @param size the size of the file
*/
public Path getSpillIndexFileForWrite(int spillNumber, long size) throws IOException {
String path = String
@ -152,10 +126,7 @@ public Path getSpillIndexFileForWrite(int spillNumber, long size) throws IOExcep
/**
* Return a local reduce input file created earlier
*
* @param mapId
* a map task id
* @return path
* @throws IOException
* @param mapId a map task id
*/
public Path getInputFile(int mapId) throws IOException {
return lDirAlloc.getLocalPathToRead(
@ -166,14 +137,11 @@ public Path getInputFile(int mapId) throws IOException {
/**
* Create a local reduce input file name.
*
* @param mapId
* a map task id
* @param size
* the size of the file
* @return path
* @throws IOException
* @param mapId a map task id
* @param size the size of the file
*/
public Path getInputFileForWrite(TaskID mapId, long size, Configuration conf) throws IOException {
public Path getInputFileForWrite(TaskID mapId, long size, Configuration conf)
throws IOException {
return lDirAlloc.getLocalPathForWrite(
String.format(REDUCE_INPUT_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT, mapId.getId()), size,
conf);

View File

@ -32,103 +32,70 @@ public interface NativeTaskOutput {
/**
* Return the path to local map output file created earlier
*
* @return path
* @throws IOException
*/
public Path getOutputFile() throws IOException;
/**
* Create a local map output file name.
*
* @param size
* the size of the file
* @return path
* @throws IOException
* @param size the size of the file
*/
public Path getOutputFileForWrite(long size) throws IOException;
/**
* Return the path to a local map output index file created earlier
*
* @return path
* @throws IOException
*/
public Path getOutputIndexFile() throws IOException;
/**
* Create a local map output index file name.
*
* @param size
* the size of the file
* @return path
* @throws IOException
* @param size the size of the file
*/
public Path getOutputIndexFileForWrite(long size) throws IOException;
/**
* Return a local map spill file created earlier.
*
* @param spillNumber
* the number
* @return path
* @throws IOException
* @param spillNumber the number
*/
public Path getSpillFile(int spillNumber) throws IOException;
/**
* Create a local map spill file name.
*
* @param spillNumber
* the number
* @param size
* the size of the file
* @return path
* @throws IOException
* @param spillNumber the number
* @param size the size of the file
*/
public Path getSpillFileForWrite(int spillNumber, long size) throws IOException;
/**
* Return a local map spill index file created earlier
*
* @param spillNumber
* the number
* @return path
* @throws IOException
* @param spillNumber the number
*/
public Path getSpillIndexFile(int spillNumber) throws IOException;
/**
* Create a local map spill index file name.
*
* @param spillNumber
* the number
* @param size
* the size of the file
* @return path
* @throws IOException
r* @param spillNumber the number
* @param size the size of the file
*/
public Path getSpillIndexFileForWrite(int spillNumber, long size) throws IOException;
/**
* Return a local reduce input file created earlier
*
* @param mapId
* a map task id
* @return path
* @throws IOException
* @param mapId a map task id
*/
public Path getInputFile(int mapId) throws IOException;
/**
* Create a local reduce input file name.
*
* @param mapId
* a map task id
* @param size
* the size of the file
* @return path
* @throws IOException
* @param mapId a map task id
* @param size the size of the file
*/
public Path getInputFileForWrite(TaskID mapId, long size, Configuration conf) throws IOException;

View File

@ -28,11 +28,13 @@
/**
* Manipulate the working area for the transient store for maps and reduces.
*
* This class is used by map and reduce tasks to identify the directories that they need to write
* to/read from for intermediate files. The callers of these methods are from child space and see
* mapreduce.cluster.local.dir as taskTracker/jobCache/jobId/attemptId This class should not be used
* from TaskTracker space.
*
* This class is used by map and reduce tasks to identify the directories that they need
* to write to/read from for intermediate files. The callers of these methods are from
* child space and see mapreduce.cluster.local.dir as
* taskTracker/jobCache/jobId/attemptId.
*
* This class should not be used from TaskTracker space.
*/
@InterfaceAudience.Private
public class NativeTaskOutputFiles implements NativeTaskOutput {
@ -55,9 +57,6 @@ public NativeTaskOutputFiles(Configuration conf, String id) {
/**
* Return the path to local map output file created earlier
*
* @return path
* @throws IOException
*/
public Path getOutputFile() throws IOException {
String path = String.format(OUTPUT_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT, id);
@ -66,11 +65,8 @@ public Path getOutputFile() throws IOException {
/**
* Create a local map output file name.
*
* @param size
* the size of the file
* @return path
* @throws IOException
*
* @param size the size of the file
*/
public Path getOutputFileForWrite(long size) throws IOException {
String path = String.format(OUTPUT_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT, id);
@ -79,9 +75,6 @@ public Path getOutputFileForWrite(long size) throws IOException {
/**
* Return the path to a local map output index file created earlier
*
* @return path
* @throws IOException
*/
public Path getOutputIndexFile() throws IOException {
String path = String.format(OUTPUT_FILE_INDEX_FORMAT_STRING, TASKTRACKER_OUTPUT, id);
@ -91,10 +84,7 @@ public Path getOutputIndexFile() throws IOException {
/**
* Create a local map output index file name.
*
* @param size
* the size of the file
* @return path
* @throws IOException
* @param size the size of the file
*/
public Path getOutputIndexFileForWrite(long size) throws IOException {
String path = String.format(OUTPUT_FILE_INDEX_FORMAT_STRING, TASKTRACKER_OUTPUT, id);
@ -104,10 +94,7 @@ public Path getOutputIndexFileForWrite(long size) throws IOException {
/**
* Return a local map spill file created earlier.
*
* @param spillNumber
* the number
* @return path
* @throws IOException
* @param spillNumber the number
*/
public Path getSpillFile(int spillNumber) throws IOException {
String path = String.format(SPILL_FILE_FORMAT_STRING, id, TASKTRACKER_OUTPUT, spillNumber);
@ -117,12 +104,8 @@ public Path getSpillFile(int spillNumber) throws IOException {
/**
* Create a local map spill file name.
*
* @param spillNumber
* the number
* @param size
* the size of the file
* @return path
* @throws IOException
* @param spillNumber the number
* @param size the size of the file
*/
public Path getSpillFileForWrite(int spillNumber, long size) throws IOException {
String path = String.format(SPILL_FILE_FORMAT_STRING, id, TASKTRACKER_OUTPUT, spillNumber);
@ -132,10 +115,7 @@ public Path getSpillFileForWrite(int spillNumber, long size) throws IOException
/**
* Return a local map spill index file created earlier
*
* @param spillNumber
* the number
* @return path
* @throws IOException
* @param spillNumber the number
*/
public Path getSpillIndexFile(int spillNumber) throws IOException {
String path = String
@ -146,12 +126,8 @@ public Path getSpillIndexFile(int spillNumber) throws IOException {
/**
* Create a local map spill index file name.
*
* @param spillNumber
* the number
* @param size
* the size of the file
* @return path
* @throws IOException
* @param spillNumber the number
* @param size the size of the file
*/
public Path getSpillIndexFileForWrite(int spillNumber, long size) throws IOException {
String path = String
@ -162,10 +138,7 @@ public Path getSpillIndexFileForWrite(int spillNumber, long size) throws IOExcep
/**
* Return a local reduce input file created earlier
*
* @param mapId
* a map task id
* @return path
* @throws IOException
* @param mapId a map task id
*/
public Path getInputFile(int mapId) throws IOException {
return lDirAlloc.getLocalPathToRead(
@ -176,14 +149,11 @@ public Path getInputFile(int mapId) throws IOException {
/**
* Create a local reduce input file name.
*
* @param mapId
* a map task id
* @param size
* the size of the file
* @return path
* @throws IOException
* @param mapId a map task id
* @param size the size of the file
*/
public Path getInputFileForWrite(TaskID mapId, long size, Configuration conf) throws IOException {
public Path getInputFileForWrite(TaskID mapId, long size, Configuration conf)
throws IOException {
return lDirAlloc.getLocalPathForWrite(
String.format(REDUCE_INPUT_FILE_FORMAT_STRING, TASKTRACKER_OUTPUT, mapId.getId()), size,
conf);

View File

@ -112,10 +112,15 @@ public int readInt() {
}
public long readLong() {
final long result = ((_buff[_readPoint + 0] & 255) << 0) + ((_buff[_readPoint + 1] & 255) << 8)
+ ((_buff[_readPoint + 2] & 255) << 16) + ((long) (_buff[_readPoint + 3] & 255) << 24)
+ ((long) (_buff[_readPoint + 4] & 255) << 32) + ((long) (_buff[_readPoint + 5] & 255) << 40)
+ ((long) (_buff[_readPoint + 6] & 255) << 48) + (((long) _buff[_readPoint + 7] << 56));
final long result =
((_buff[_readPoint + 0] & 255) << 0) +
((_buff[_readPoint + 1] & 255) << 8) +
((_buff[_readPoint + 2] & 255) << 16) +
((long) (_buff[_readPoint + 3] & 255) << 24) +
((long) (_buff[_readPoint + 4] & 255) << 32) +
((long) (_buff[_readPoint + 5] & 255) << 40) +
((long) (_buff[_readPoint + 6] & 255) << 48) +
(((long) _buff[_readPoint + 7] << 56));
_readPoint += 8;
return result;
@ -144,7 +149,8 @@ private void checkWriteSpaceAndResizeIfNecessary(int toBeWritten) {
if (_buff.length - _writePoint >= toBeWritten) {
return;
}
final int newLength = (toBeWritten + _writePoint > CACHE_LINE_SIZE) ? (toBeWritten + _writePoint) : CACHE_LINE_SIZE;
final int newLength = (toBeWritten + _writePoint > CACHE_LINE_SIZE) ?
(toBeWritten + _writePoint) : CACHE_LINE_SIZE;
final byte[] newBuff = new byte[newLength];
System.arraycopy(_buff, 0, newBuff, 0, _writePoint);
_buff = newBuff;

View File

@ -1,307 +0,0 @@
// Copyright (c) 2011 Google, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//
// CityHash Version 1, by Geoff Pike and Jyrki Alakuijala
//
// This file provides CityHash64() and related functions.
//
// It's probably possible to create even faster hash functions by
// writing a program that systematically explores some of the space of
// possible hash functions, by using SIMD instructions, or by
// compromising on hash quality.
#include "city.h"
#include <algorithm>
using namespace std;
#define UNALIGNED_LOAD64(p) (*(const uint64*)(p))
#define UNALIGNED_LOAD32(p) (*(const uint32*)(p))
#if !defined(LIKELY)
#if defined(__GNUC__)
#define LIKELY(x) (__builtin_expect(!!(x), 1))
#else
#define LIKELY(x) (x)
#endif
#endif
// Some primes between 2^63 and 2^64 for various uses.
static const uint64 k0 = 0xc3a5c85c97cb3127ULL;
static const uint64 k1 = 0xb492b66fbe98f273ULL;
static const uint64 k2 = 0x9ae16a3b2f90404fULL;
static const uint64 k3 = 0xc949d7c7509e6557ULL;
// Bitwise right rotate. Normally this will compile to a single
// instruction, especially if the shift is a manifest constant.
static uint64 Rotate(uint64 val, int shift) {
// Avoid shifting by 64: doing so yields an undefined result.
return shift == 0 ? val : ((val >> shift) | (val << (64 - shift)));
}
// Equivalent to Rotate(), but requires the second arg to be non-zero.
// On x86-64, and probably others, it's possible for this to compile
// to a single instruction if both args are already in registers.
static uint64 RotateByAtLeast1(uint64 val, int shift) {
return (val >> shift) | (val << (64 - shift));
}
static uint64 ShiftMix(uint64 val) {
return val ^ (val >> 47);
}
static uint64 HashLen16(uint64 u, uint64 v) {
return Hash128to64(uint128(u, v));
}
static uint64 HashLen0to16(const char *s, size_t len) {
if (len > 8) {
uint64 a = UNALIGNED_LOAD64(s);
uint64 b = UNALIGNED_LOAD64(s + len - 8);
return HashLen16(a, RotateByAtLeast1(b + len, len)) ^ b;
}
if (len >= 4) {
uint64 a = UNALIGNED_LOAD32(s);
return HashLen16(len + (a << 3), UNALIGNED_LOAD32(s + len - 4));
}
if (len > 0) {
uint8 a = s[0];
uint8 b = s[len >> 1];
uint8 c = s[len - 1];
uint32 y = static_cast<uint32>(a) + (static_cast<uint32>(b) << 8);
uint32 z = len + (static_cast<uint32>(c) << 2);
return ShiftMix(y * k2 ^ z * k3) * k2;
}
return k2;
}
// This probably works well for 16-byte strings as well, but it may be overkill
// in that case.
static uint64 HashLen17to32(const char *s, size_t len) {
uint64 a = UNALIGNED_LOAD64(s) * k1;
uint64 b = UNALIGNED_LOAD64(s + 8);
uint64 c = UNALIGNED_LOAD64(s + len - 8) * k2;
uint64 d = UNALIGNED_LOAD64(s + len - 16) * k0;
return HashLen16(Rotate(a - b, 43) + Rotate(c, 30) + d,
a + Rotate(b ^ k3, 20) - c + len);
}
// Return a 16-byte hash for 48 bytes. Quick and dirty.
// Callers do best to use "random-looking" values for a and b.
static pair<uint64, uint64> WeakHashLen32WithSeeds(
uint64 w, uint64 x, uint64 y, uint64 z, uint64 a, uint64 b) {
a += w;
b = Rotate(b + a + z, 21);
uint64 c = a;
a += x;
a += y;
b += Rotate(a, 44);
return make_pair(a + z, b + c);
}
// Return a 16-byte hash for s[0] ... s[31], a, and b. Quick and dirty.
static pair<uint64, uint64> WeakHashLen32WithSeeds(
const char* s, uint64 a, uint64 b) {
return WeakHashLen32WithSeeds(UNALIGNED_LOAD64(s),
UNALIGNED_LOAD64(s + 8),
UNALIGNED_LOAD64(s + 16),
UNALIGNED_LOAD64(s + 24),
a,
b);
}
// Return an 8-byte hash for 33 to 64 bytes.
static uint64 HashLen33to64(const char *s, size_t len) {
uint64 z = UNALIGNED_LOAD64(s + 24);
uint64 a = UNALIGNED_LOAD64(s) + (len + UNALIGNED_LOAD64(s + len - 16)) * k0;
uint64 b = Rotate(a + z, 52);
uint64 c = Rotate(a, 37);
a += UNALIGNED_LOAD64(s + 8);
c += Rotate(a, 7);
a += UNALIGNED_LOAD64(s + 16);
uint64 vf = a + z;
uint64 vs = b + Rotate(a, 31) + c;
a = UNALIGNED_LOAD64(s + 16) + UNALIGNED_LOAD64(s + len - 32);
z = UNALIGNED_LOAD64(s + len - 8);
b = Rotate(a + z, 52);
c = Rotate(a, 37);
a += UNALIGNED_LOAD64(s + len - 24);
c += Rotate(a, 7);
a += UNALIGNED_LOAD64(s + len - 16);
uint64 wf = a + z;
uint64 ws = b + Rotate(a, 31) + c;
uint64 r = ShiftMix((vf + ws) * k2 + (wf + vs) * k0);
return ShiftMix(r * k0 + vs) * k2;
}
uint64 CityHash64(const char *s, size_t len) {
if (len <= 32) {
if (len <= 16) {
return HashLen0to16(s, len);
} else {
return HashLen17to32(s, len);
}
} else if (len <= 64) {
return HashLen33to64(s, len);
}
// For strings over 64 bytes we hash the end first, and then as we
// loop we keep 56 bytes of state: v, w, x, y, and z.
uint64 x = UNALIGNED_LOAD64(s);
uint64 y = UNALIGNED_LOAD64(s + len - 16) ^ k1;
uint64 z = UNALIGNED_LOAD64(s + len - 56) ^ k0;
pair<uint64, uint64> v = WeakHashLen32WithSeeds(s + len - 64, len, y);
pair<uint64, uint64> w = WeakHashLen32WithSeeds(s + len - 32, len * k1, k0);
z += ShiftMix(v.second) * k1;
x = Rotate(z + x, 39) * k1;
y = Rotate(y, 33) * k1;
// Decrease len to the nearest multiple of 64, and operate on 64-byte chunks.
len = (len - 1) & ~static_cast<size_t>(63);
do {
x = Rotate(x + y + v.first + UNALIGNED_LOAD64(s + 16), 37) * k1;
y = Rotate(y + v.second + UNALIGNED_LOAD64(s + 48), 42) * k1;
x ^= w.second;
y ^= v.first;
z = Rotate(z ^ w.first, 33);
v = WeakHashLen32WithSeeds(s, v.second * k1, x + w.first);
w = WeakHashLen32WithSeeds(s + 32, z + w.second, y);
std::swap(z, x);
s += 64;
len -= 64;
} while (len != 0);
return HashLen16(HashLen16(v.first, w.first) + ShiftMix(y) * k1 + z,
HashLen16(v.second, w.second) + x);
}
uint64 CityHash64WithSeed(const char *s, size_t len, uint64 seed) {
return CityHash64WithSeeds(s, len, k2, seed);
}
uint64 CityHash64WithSeeds(const char *s, size_t len,
uint64 seed0, uint64 seed1) {
return HashLen16(CityHash64(s, len) - seed0, seed1);
}
// A subroutine for CityHash128(). Returns a decent 128-bit hash for strings
// of any length representable in ssize_t. Based on City and Murmur.
static uint128 CityMurmur(const char *s, size_t len, uint128 seed) {
uint64 a = Uint128Low64(seed);
uint64 b = Uint128High64(seed);
uint64 c = 0;
uint64 d = 0;
ssize_t l = len - 16;
if (l <= 0) { // len <= 16
c = b * k1 + HashLen0to16(s, len);
d = Rotate(a + (len >= 8 ? UNALIGNED_LOAD64(s) : c), 32);
} else { // len > 16
c = HashLen16(UNALIGNED_LOAD64(s + len - 8) + k1, a);
d = HashLen16(b + len, c + UNALIGNED_LOAD64(s + len - 16));
a += d;
do {
a ^= ShiftMix(UNALIGNED_LOAD64(s) * k1) * k1;
a *= k1;
b ^= a;
c ^= ShiftMix(UNALIGNED_LOAD64(s + 8) * k1) * k1;
c *= k1;
d ^= c;
s += 16;
l -= 16;
} while (l > 0);
}
a = HashLen16(a, c);
b = HashLen16(d, b);
return uint128(a ^ b, HashLen16(b, a));
}
uint128 CityHash128WithSeed(const char *s, size_t len, uint128 seed) {
if (len < 128) {
return CityMurmur(s, len, seed);
}
// We expect len >= 128 to be the common case. Keep 56 bytes of state:
// v, w, x, y, and z.
pair<uint64, uint64> v, w;
uint64 x = Uint128Low64(seed);
uint64 y = Uint128High64(seed);
uint64 z = len * k1;
v.first = Rotate(y ^ k1, 49) * k1 + UNALIGNED_LOAD64(s);
v.second = Rotate(v.first, 42) * k1 + UNALIGNED_LOAD64(s + 8);
w.first = Rotate(y + z, 35) * k1 + x;
w.second = Rotate(x + UNALIGNED_LOAD64(s + 88), 53) * k1;
// This is the same inner loop as CityHash64(), manually unrolled.
do {
x = Rotate(x + y + v.first + UNALIGNED_LOAD64(s + 16), 37) * k1;
y = Rotate(y + v.second + UNALIGNED_LOAD64(s + 48), 42) * k1;
x ^= w.second;
y ^= v.first;
z = Rotate(z ^ w.first, 33);
v = WeakHashLen32WithSeeds(s, v.second * k1, x + w.first);
w = WeakHashLen32WithSeeds(s + 32, z + w.second, y);
std::swap(z, x);
s += 64;
x = Rotate(x + y + v.first + UNALIGNED_LOAD64(s + 16), 37) * k1;
y = Rotate(y + v.second + UNALIGNED_LOAD64(s + 48), 42) * k1;
x ^= w.second;
y ^= v.first;
z = Rotate(z ^ w.first, 33);
v = WeakHashLen32WithSeeds(s, v.second * k1, x + w.first);
w = WeakHashLen32WithSeeds(s + 32, z + w.second, y);
std::swap(z, x);
s += 64;
len -= 128;
} while (LIKELY(len >= 128));
y += Rotate(w.first, 37) * k0 + z;
x += Rotate(v.first + z, 49) * k0;
// If 0 < len < 128, hash up to 4 chunks of 32 bytes each from the end of s.
for (size_t tail_done = 0; tail_done < len; ) {
tail_done += 32;
y = Rotate(y - x, 42) * k0 + v.second;
w.first += UNALIGNED_LOAD64(s + len - tail_done + 16);
x = Rotate(x, 49) * k0 + w.first;
w.first += v.first;
v = WeakHashLen32WithSeeds(s + len - tail_done, v.first, v.second);
}
// At this point our 48 bytes of state should contain more than
// enough information for a strong 128-bit hash. We use two
// different 48-byte-to-8-byte hashes to get a 16-byte final result.
x = HashLen16(x, v.first);
y = HashLen16(y, w.first);
return uint128(HashLen16(x + v.second, w.second) + y,
HashLen16(x + w.second, y + v.second));
}
uint128 CityHash128(const char *s, size_t len) {
if (len >= 16) {
return CityHash128WithSeed(s + 16,
len - 16,
uint128(UNALIGNED_LOAD64(s) ^ k3,
UNALIGNED_LOAD64(s + 8)));
} else if (len >= 8) {
return CityHash128WithSeed(NULL,
0,
uint128(UNALIGNED_LOAD64(s) ^ (len * k0),
UNALIGNED_LOAD64(s + len - 8) ^ k1));
} else {
return CityHash128WithSeed(s, len, uint128(k0, k1));
}
}

View File

@ -1,90 +0,0 @@
// Copyright (c) 2011 Google, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//
// CityHash Version 1, by Geoff Pike and Jyrki Alakuijala
//
// This file provides a few functions for hashing strings. On x86-64
// hardware in 2011, CityHash64() is faster than other high-quality
// hash functions, such as Murmur. This is largely due to higher
// instruction-level parallelism. CityHash64() and CityHash128() also perform
// well on hash-quality tests.
//
// CityHash128() is optimized for relatively long strings and returns
// a 128-bit hash. For strings more than about 2000 bytes it can be
// faster than CityHash64().
//
// Functions in the CityHash family are not suitable for cryptography.
//
// WARNING: This code has not been tested on big-endian platforms!
// It is known to work well on little-endian platforms that have a small penalty
// for unaligned reads, such as current Intel and AMD moderate-to-high-end CPUs.
//
// By the way, for some hash functions, given strings a and b, the hash
// of a+b is easily derived from the hashes of a and b. This property
// doesn't hold for any hash functions in this file.
#ifndef CITY_HASH_H_
#define CITY_HASH_H_
#include <stdlib.h> // for size_t.
#include <stdint.h>
#include <utility>
typedef uint8_t uint8;
typedef uint32_t uint32;
typedef uint64_t uint64;
typedef std::pair<uint64, uint64> uint128;
inline uint64 Uint128Low64(const uint128& x) { return x.first; }
inline uint64 Uint128High64(const uint128& x) { return x.second; }
// Hash function for a byte array.
uint64 CityHash64(const char *buf, size_t len);
// Hash function for a byte array. For convenience, a 64-bit seed is also
// hashed into the result.
uint64 CityHash64WithSeed(const char *buf, size_t len, uint64 seed);
// Hash function for a byte array. For convenience, two seeds are also
// hashed into the result.
uint64 CityHash64WithSeeds(const char *buf, size_t len,
uint64 seed0, uint64 seed1);
// Hash function for a byte array.
uint128 CityHash128(const char *s, size_t len);
// Hash function for a byte array. For convenience, a 128-bit seed is also
// hashed into the result.
uint128 CityHash128WithSeed(const char *s, size_t len, uint128 seed);
// Hash 128 input bits down to 64 bits of output.
// This is intended to be a reasonably good hash function.
inline uint64 Hash128to64(const uint128& x) {
// Murmur-inspired hashing.
const uint64 kMul = 0x9ddfea08eb382d69ULL;
uint64 a = (Uint128Low64(x) ^ Uint128High64(x)) * kMul;
a ^= (a >> 47);
uint64 b = (Uint128High64(x) ^ a) * kMul;
b ^= (b >> 47);
b *= kMul;
return b;
}
#endif // CITY_HASH_H_

View File

@ -38,13 +38,6 @@ using std::pair;
enum NativeObjectType {
UnknownObjectType = 0,
BatchHandlerType = 1,
MapperType = 2,
ReducerType = 3,
PartitionerType = 4,
CombinerType = 5,
FolderType = 6,
RecordReaderType = 7,
RecordWriterType = 8
};
/**
@ -69,7 +62,6 @@ enum Endium {
#define INPUT_LINE_KV_SEPERATOR "mapreduce.input.keyvaluelinerecordreader.key.value.separator"
#define MAPRED_TEXTOUTPUT_FORMAT_SEPERATOR "mapreduce.output.textoutputformat.separator"
#define MAPRED_WORK_OUT_DIR "mapreduce.task.output.dir"
#define NATIVE_OUTPUT_FILE_NAME "native.output.file.name"
#define MAPRED_COMPRESS_OUTPUT "mapreduce.output.fileoutputformat.compress"
#define MAPRED_OUTPUT_COMPRESSION_CODEC "mapreduce.output.fileoutputformat.compress.codec"
#define TOTAL_ORDER_PARTITIONER_PATH "total.order.partitioner.path"
@ -386,32 +378,6 @@ public:
virtual bool next(Buffer & key, Buffer & value) = 0;
};
class RecordReader : public KVIterator, public Configurable, public Progress {
public:
virtual NativeObjectType type() {
return RecordReaderType;
}
virtual bool next(Buffer & key, Buffer & value) = 0;
virtual float getProgress() = 0;
virtual void close() = 0;
};
class RecordWriter : public Collector, public Configurable {
public:
virtual NativeObjectType type() {
return RecordWriterType;
}
virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen) {
}
virtual void close() {
}
};
class ProcessorBase : public Configurable {
protected:
@ -444,36 +410,6 @@ public:
}
};
class Mapper : public ProcessorBase {
public:
virtual NativeObjectType type() {
return MapperType;
}
/**
* Map interface, default IdenticalMapper
*/
virtual void map(const char * key, uint32_t keyLen, const char * value, uint32_t valueLen) {
collect(key, keyLen, value, valueLen);
}
};
class Partitioner : public Configurable {
public:
virtual NativeObjectType type() {
return PartitionerType;
}
/**
* Partition interface
* @param key key buffer
* @param keyLen key length, can be modified to smaller value
* to truncate key
* @return partition number
*/
virtual uint32_t getPartition(const char * key, uint32_t & keyLen, uint32_t numPartition);
};
enum KeyGroupIterState {
SAME_KEY,
NEW_KEY,
@ -502,80 +438,7 @@ public:
virtual const char * nextValue(uint32_t & len) = 0;
};
class Reducer : public ProcessorBase {
public:
virtual NativeObjectType type() {
return ReducerType;
}
/**
* Reduce interface, default IdenticalReducer
*/
virtual void reduce(KeyGroupIterator & input) {
const char * key;
const char * value;
uint32_t keyLen;
uint32_t valueLen;
key = input.getKey(keyLen);
while (NULL != (value = input.nextValue(valueLen))) {
collect(key, keyLen, value, valueLen);
}
}
};
/**
* Folder API used for hashtable based aggregation
* Folder will be used in this way:
* on(key, value):
* state = hashtable.get(key)
* if state == None:
* size = size()
* if size == -1:
* state = init(null, -1)
* elif size > 0:
* state = fixallocator.get(key)
* init(state, size)
* folder(state, value, value.len)
*
* final():
* for k,state in hashtable:
* final(key, key.len, state)
*/
class Folder : public ProcessorBase {
public:
virtual NativeObjectType type() {
return FolderType;
}
/**
* Get aggregator state size
* @return state storage size
* -1 size not fixed or unknown, default
* e.g. list map tree
* 0 don't need to store state
* >0 fixed sized state
* e.g. int32 int64 float.
*/
virtual int32_t size() {
return -1;
}
/**
* Create and/or init new state
*/
virtual void * init(const char * key, uint32_t keyLen) {
return NULL;
}
/**
* Aggregation function
*/
virtual void folder(void * dest, const char * value, uint32_t valueLen) {
}
virtual void final(const char * key, uint32_t keyLen, void * dest) {
}
};
enum KeyValueType {
TextType = 0,

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
#include "commons.h"
#include "lib/commons.h"
#include "NativeTask.h"
#include "BlockCodec.h"
@ -79,9 +79,8 @@ BlockDecompressStream::BlockDecompressStream(InputStream * stream, uint32_t buff
}
void BlockDecompressStream::init() {
_tempBufferSize = maxCompressedLength(_blockMax) + 8;
_tempBufferSize = maxCompressedLength(_blockMax) + 8;
_tempBuffer = (char*)malloc(_tempBufferSize);
}
BlockDecompressStream::~BlockDecompressStream() {

View File

@ -19,7 +19,7 @@
#ifndef BLOCKCODEC_H_
#define BLOCKCODEC_H_
#include "Compressions.h"
#include "lib/Compressions.h"
namespace NativeTask {

View File

@ -18,7 +18,7 @@
#include <zconf.h>
#include <zlib.h>
#include "commons.h"
#include "lib/commons.h"
#include "GzipCodec.h"
#include <iostream>

View File

@ -19,7 +19,7 @@
#ifndef GZIPCODEC_H_
#define GZIPCODEC_H_
#include "Compressions.h"
#include "lib/Compressions.h"
namespace NativeTask {

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
#include "commons.h"
#include "lib/commons.h"
#include "lz4.h"
#include "NativeTask.h"
#include "Lz4Codec.h"

View File

@ -19,7 +19,7 @@
#ifndef LZ4CODEC_H_
#define LZ4CODEC_H_
#include "Compressions.h"
#include "lib/Compressions.h"
#include "BlockCodec.h"
namespace NativeTask {

View File

@ -19,7 +19,7 @@
#include "config.h"
#if defined HADOOP_SNAPPY_LIBRARY
#include "commons.h"
#include "lib/commons.h"
#include "NativeTask.h"
#include "SnappyCodec.h"

View File

@ -19,7 +19,7 @@
#ifndef SNAPPYCODEC_H_
#define SNAPPYCODEC_H_
#include "Compressions.h"
#include "lib/Compressions.h"
#include "BlockCodec.h"
namespace NativeTask {

View File

@ -16,20 +16,19 @@
* limitations under the License.
*/
#include "commons.h"
#include "lib/commons.h"
#include "util/StringUtil.h"
#include "MCollectorOutputHandler.h"
#include "NativeObjectFactory.h"
#include "MapOutputCollector.h"
#include "lib/NativeObjectFactory.h"
#include "lib/MapOutputCollector.h"
#include "CombineHandler.h"
using std::string;
using std::vector;
namespace NativeTask {
const Command AbstractMapHandler::GET_OUTPUT_PATH(100, "GET_OUTPUT_PATH");
const Command AbstractMapHandler::GET_OUTPUT_INDEX_PATH(101, "GET_OUTPUT_INDEX_PATH");
const Command AbstractMapHandler::GET_SPILL_PATH(102, "GET_SPILL_PATH");
const Command AbstractMapHandler::GET_COMBINE_HANDLER(103, "GET_COMBINE_HANDLER");
} //namespace
} // namespace NativeTask

View File

@ -19,11 +19,11 @@
#ifndef QUICK_BUILD
#include "org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h"
#endif
#include "commons.h"
#include "lib/commons.h"
#include "jni_md.h"
#include "jniutils.h"
#include "lib/jniutils.h"
#include "BatchHandler.h"
#include "NativeObjectFactory.h"
#include "lib/NativeObjectFactory.h"
///////////////////////////////////////////////////////////////
// NativeBatchProcessor jni util methods
@ -130,7 +130,8 @@ void BatchHandler::onSetup(Config * config, char * inputBuffer, uint32_t inputBu
_out.reset(outputBuffer, outputBufferCapacity);
_out.rewind(0, outputBufferCapacity);
LOG("[BatchHandler::onSetup] input Capacity %d, output capacity %d", inputBufferCapacity, _out.limit());
LOG("[BatchHandler::onSetup] input Capacity %d, output capacity %d",
inputBufferCapacity, _out.limit());
}
configure(_config);
}

View File

@ -208,7 +208,6 @@ void CombineHandler::write(char * buf, uint32_t length) {
outputRecordCount++;
remain -= kv->length();
pos += kv->length();
;
}
_combineOutputRecordCount += outputRecordCount;
@ -242,7 +241,8 @@ void CombineHandler::combine(CombineContext type, KVIterator * kvIterator, IFile
this->_writer = writer;
call(COMBINE, NULL);
LOG("[CombineHandler] input Record Count: %d, input Bytes: %d, output Record Count: %d, output Bytes: %d",
LOG("[CombineHandler] input Record Count: %d, input Bytes: %d, "
"output Record Count: %d, output Bytes: %d",
_combineInputRecordCount, _combineInputBytes,
_combineOutputRecordCount, _combineOutputBytes);
return;

View File

@ -18,7 +18,7 @@
#ifndef _COMBINEHANDLER_H_
#define _COMBINEHANDLER_H_
#include "Combiner.h"
#include "lib/Combiner.h"
#include "BatchHandler.h"
namespace NativeTask {

View File

@ -16,12 +16,12 @@
* limitations under the License.
*/
#include "commons.h"
#include "lib/commons.h"
#include "util/StringUtil.h"
#include "lib/TaskCounters.h"
#include "MCollectorOutputHandler.h"
#include "NativeObjectFactory.h"
#include "MapOutputCollector.h"
#include "lib/NativeObjectFactory.h"
#include "lib/MapOutputCollector.h"
#include "CombineHandler.h"
using std::string;

View File

@ -16,123 +16,11 @@
* limitations under the License.
*/
#include "commons.h"
#include "BufferStream.h"
#include "lib/commons.h"
#include "lib/BufferStream.h"
namespace NativeTask {
BufferedInputStream::BufferedInputStream(InputStream * stream, uint32_t bufferSize)
: FilterInputStream(stream), _buff(NULL), _position(0), _limit(0), _capacity(0) {
_buff = (char*)malloc(bufferSize);
if (NULL != _buff) {
LOG("[BuferStream] malloc failed when create BufferedInputStream with buffersize %u",
bufferSize);
_capacity = bufferSize;
}
}
BufferedInputStream::~BufferedInputStream() {
if (NULL != _buff) {
free(_buff);
_buff = NULL;
_position = 0;
_limit = 0;
_capacity = 0;
}
}
void BufferedInputStream::seek(uint64_t position) {
if (_limit - _position > 0) {
THROW_EXCEPTION(IOException, "temporary buffered data exists when fseek()");
}
_stream->seek(position);
}
uint64_t BufferedInputStream::tell() {
return _stream->tell() - (_limit - _position);
}
int32_t BufferedInputStream::read(void * buff, uint32_t length) {
uint32_t rest = _limit - _position;
if (rest > 0) {
// have some data in buffer, read from buffer
uint32_t cp = rest < length ? rest : length;
memcpy(buff, _buff + _position, cp);
_position += cp;
return cp;
} else if (length >= _capacity / 2) {
// dest buffer big enough, read to dest buffer directly
return _stream->read(buff, length);
} else {
// read to buffer first, then copy part of it to dest
_limit = 0;
do {
int32_t rd = _stream->read(_buff + _limit, _capacity - _limit);
if (rd <= 0) {
break;
}
} while (_limit < _capacity / 2);
if (_limit == 0) {
return -1;
}
uint32_t cp = _limit < length ? _limit : length;
memcpy(buff, _buff, cp);
_position = cp;
return cp;
}
}
/////////////////////////////////////////////////////////////////
BufferedOutputStream::BufferedOutputStream(InputStream * stream, uint32_t bufferSize)
: FilterOutputStream(_stream), _buff(NULL), _position(0), _capacity(0) {
_buff = (char*)malloc(bufferSize + sizeof(uint64_t));
if (NULL != _buff) {
LOG("[BuferStream] malloc failed when create BufferedOutputStream with buffersize %u",
bufferSize);
_capacity = bufferSize;
}
}
BufferedOutputStream::~BufferedOutputStream() {
if (NULL != _buff) {
free(_buff);
_buff = NULL;
_position = 0;
_capacity = 0;
}
}
uint64_t BufferedOutputStream::tell() {
return _stream->tell() + _position;
}
void BufferedOutputStream::write(const void * buff, uint32_t length) {
if (length < _capacity / 2) {
uint32_t rest = _capacity - _position;
if (length < rest) {
simple_memcpy(_buff + _position, buff, length);
_position += length;
} else {
flush();
simple_memcpy(_buff, buff, length);
_position = length;
}
} else {
flush();
_stream->write(buff, length);
}
}
void BufferedOutputStream::flush() {
if (_position > 0) {
_stream->write(_buff, _position);
_position = 0;
}
}
///////////////////////////////////////////////////////////
int32_t InputBuffer::read(void * buff, uint32_t length) {
uint32_t rd = _capacity - _position < length ? _capacity - _position : length;
if (rd > 0) {

View File

@ -20,49 +20,12 @@
#define BUFFERSTREAM_H_
#include <string>
#include "Streams.h"
#include "lib/Streams.h"
namespace NativeTask {
using std::string;
class BufferedInputStream : public FilterInputStream {
protected:
char * _buff;
uint32_t _position;
uint32_t _limit;
uint32_t _capacity;
public:
BufferedInputStream(InputStream * stream, uint32_t bufferSize = 64 * 1024);
virtual ~BufferedInputStream();
virtual void seek(uint64_t position);
virtual uint64_t tell();
virtual int32_t read(void * buff, uint32_t length);
};
class BufferedOutputStream : public FilterOutputStream {
protected:
char * _buff;
uint32_t _position;
uint32_t _capacity;
public:
BufferedOutputStream(InputStream * stream, uint32_t bufferSize = 64 * 1024);
virtual ~BufferedOutputStream();
virtual uint64_t tell();
virtual void write(const void * buff, uint32_t length);
virtual void flush();
};
class InputBuffer : public InputStream {
protected:
const char * _buff;

View File

@ -16,84 +16,15 @@
* limitations under the License.
*/
#include "commons.h"
#include <string>
#include "lib/commons.h"
#include "util/StringUtil.h"
#include "util/WritableUtils.h"
#include "Buffers.h"
#include "lib/Buffers.h"
namespace NativeTask {
DynamicBuffer::DynamicBuffer()
: _data(NULL), _capacity(0), _size(0), _used(0) {
}
DynamicBuffer::DynamicBuffer(uint32_t capacity)
: _data(NULL), _capacity(0), _size(0), _used(0) {
reserve(capacity);
}
DynamicBuffer::~DynamicBuffer() {
release();
}
void DynamicBuffer::release() {
if (_data != NULL) {
free(_data);
_data = NULL;
_capacity = 0;
_used = 0;
}
}
void DynamicBuffer::reserve(uint32_t capacity) {
if (_data != NULL) {
if (capacity > _capacity) {
char * newdata = (char*)realloc(_data, capacity);
if (newdata == NULL) {
THROW_EXCEPTION_EX(OutOfMemoryException, "DynamicBuffer reserve realloc %u failed",
capacity);
}
_data = newdata;
_capacity = capacity;
}
return;
}
release();
char * newdata = (char*)malloc(capacity);
if (newdata == NULL) {
THROW_EXCEPTION_EX(OutOfMemoryException, "DynamicBuffer reserve new %u failed", capacity);
}
_data = newdata;
_capacity = capacity;
_size = 0;
_used = 0;
}
int32_t DynamicBuffer::refill(InputStream * stream) {
if (_data == NULL || freeSpace() == 0) {
THROW_EXCEPTION(IOException, "refill DynamicBuffer failed, no space left");
}
int32_t rd = stream->read(_data + _size, freeSpace());
if (rd > 0) {
_size += rd;
}
return rd;
}
void DynamicBuffer::cleanUsed() {
if (_used > 0) {
uint32_t needToMove = _size - _used;
if (needToMove > 0) {
memmove(_data, _data + _used, needToMove);
_size = needToMove;
} else {
_size = 0;
}
_used = 0;
}
}
///////////////////////////////////////////////////////////
ReadBuffer::ReadBuffer()
: _buff(NULL), _remain(0), _size(0), _capacity(0), _stream(NULL), _source(NULL) {

View File

@ -19,69 +19,12 @@
#ifndef BUFFERS_H_
#define BUFFERS_H_
#include "Streams.h"
#include "Compressions.h"
#include "Constants.h"
#include "lib/Streams.h"
#include "lib/Compressions.h"
#include "lib/Constants.h"
namespace NativeTask {
class DynamicBuffer {
protected:
char * _data;
uint32_t _capacity;
uint32_t _size;
uint32_t _used;
public:
DynamicBuffer();
DynamicBuffer(uint32_t capacity);
~DynamicBuffer();
void reserve(uint32_t capacity);
void release();
uint32_t capacity() {
return _capacity;
}
char * data() {
return _data;
}
uint32_t size() {
return _size;
}
uint32_t used() {
return _used;
}
char * current() {
return _data + _used;
}
char * end() {
return _data + _size;
}
uint32_t remain() {
return _size - _used;
}
uint32_t freeSpace() {
return _capacity - _size;
}
void use(uint32_t count) {
_used += count;
}
void cleanUsed();
int32_t refill(InputStream * stream);
};
/**
* A lightweight read buffer, act as buffered input stream

View File

@ -1,73 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Combiner.h"
#include "StringUtil.h"
namespace NativeTask {
NativeCombineRunner::NativeCombineRunner(Config * config, ObjectCreatorFunc combinerCreator)
: _config(config), _combinerCreator(combinerCreator), _keyGroupCount(0) {
if (NULL == _combinerCreator) {
THROW_EXCEPTION_EX(UnsupportException, "Create combiner failed");
}
}
KeyGroupIterator * NativeCombineRunner::createKeyGroupIterator(KVIterator * iter) {
return new KeyGroupIteratorImpl(iter);
}
void NativeCombineRunner::combine(CombineContext context, KVIterator * iterator,
IFileWriter * writer) {
Configurable * combiner = (Configurable *)(_combinerCreator());
if (NULL != combiner) {
combiner->configure(_config);
}
NativeObjectType type = combiner->type();
switch (type) {
case MapperType: {
Mapper * mapper = (Mapper*)combiner;
mapper->setCollector(writer);
Buffer key;
Buffer value;
while (iterator->next(key, value)) {
mapper->map(key.data(), key.length(), value.data(), value.length());
}
mapper->close();
delete mapper;
}
break;
case ReducerType: {
Reducer * reducer = (Reducer*)combiner;
reducer->setCollector(writer);
KeyGroupIterator * kg = createKeyGroupIterator(iterator);
while (kg->nextKey()) {
_keyGroupCount++;
reducer->reduce(*kg);
}
reducer->close();
delete reducer;
}
break;
default:
THROW_EXCEPTION(UnsupportException, "Combiner type not support");
}
}
} /* namespace NativeTask */

View File

@ -18,7 +18,7 @@
#ifndef COMBINER_H_
#define COMBINER_H_
#include "commons.h"
#include "IFile.h"
#include "lib/IFile.h"
namespace NativeTask {
@ -66,21 +66,5 @@ public:
}
};
class NativeCombineRunner : public ICombineRunner {
private:
Config * _config;
ObjectCreatorFunc _combinerCreator;
uint32_t _keyGroupCount;
public:
NativeCombineRunner(Config * config, ObjectCreatorFunc objectCreator);
public:
void combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer);
private:
KeyGroupIterator * createKeyGroupIterator(KVIterator * iter);
};
} /* namespace NativeTask */
#endif /* COMBINER_H_ */

View File

@ -16,10 +16,10 @@
* limitations under the License.
*/
#include "commons.h"
#include "lib/commons.h"
#include "config.h"
#include "SyncUtils.h"
#include "Compressions.h"
#include "lib/Compressions.h"
#include "util/SyncUtils.h"
#include "codec/GzipCodec.h"
#include "codec/SnappyCodec.h"
#include "codec/Lz4Codec.h"

View File

@ -21,7 +21,7 @@
#include <string>
#include <vector>
#include "Streams.h"
#include "lib/Streams.h"
namespace NativeTask {

View File

@ -20,14 +20,14 @@
#include <fcntl.h>
#include <dirent.h>
#include <sys/stat.h>
#include "commons.h"
#include "lib/commons.h"
#include "util/StringUtil.h"
#include "jniutils.h"
#include "lib/jniutils.h"
#include "NativeTask.h"
#include "TaskCounters.h"
#include "NativeObjectFactory.h"
#include "Path.h"
#include "FileSystem.h"
#include "lib/TaskCounters.h"
#include "lib/NativeObjectFactory.h"
#include "lib/Path.h"
#include "lib/FileSystem.h"
namespace NativeTask {
@ -122,14 +122,15 @@ void FileOutputStream::close() {
/////////////////////////////////////////////////////////////
class RawFileSystem : public FileSystem {
protected:
protected:
string getRealPath(const string & path) {
if (StringUtil::StartsWith(path, "file:")) {
return path.substr(5);
}
return path;
}
public:
public:
InputStream * open(const string & path) {
return new FileInputStream(getRealPath(path));
}
@ -206,7 +207,7 @@ public:
struct stat sb;
if (stat(np.c_str(), &sb) == 0) {
if (S_ISDIR (sb.st_mode) == 0) {
if (S_ISDIR(sb.st_mode) == 0) {
return 1;
}
return 0;
@ -226,7 +227,7 @@ public:
if (mkdir(npath, nmode)) {
return 1;
}
} else if (S_ISDIR (sb.st_mode) == 0) {
} else if (S_ISDIR(sb.st_mode) == 0) {
return 1;
}
*p++ = '/'; /* restore slash */
@ -259,4 +260,4 @@ FileSystem & FileSystem::getLocal() {
return RawFileSystemInstance;
}
} // namespace Hadoap
} // namespace NativeTask

View File

@ -21,7 +21,7 @@
#include <string>
#include "NativeTask.h"
#include "Streams.h"
#include "lib/Streams.h"
namespace NativeTask {

View File

@ -16,10 +16,10 @@
* limitations under the License.
*/
#include "commons.h"
#include "lib/commons.h"
#include "util/StringUtil.h"
#include "IFile.h"
#include "Compressions.h"
#include "lib/IFile.h"
#include "lib/Compressions.h"
#include "lib/FileSystem.h"
namespace NativeTask {

View File

@ -19,11 +19,11 @@
#ifndef IFILE_H_
#define IFILE_H_
#include "Checksum.h"
#include "Buffers.h"
#include "WritableUtils.h"
#include "SpillInfo.h"
#include "MapOutputSpec.h"
#include "util/Checksum.h"
#include "lib/Buffers.h"
#include "util/WritableUtils.h"
#include "lib/SpillInfo.h"
#include "lib/MapOutputSpec.h"
namespace NativeTask {

View File

@ -15,8 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Iterator.h"
#include "commons.h"
#include "lib/Iterator.h"
#include "lib/commons.h"
namespace NativeTask {
@ -88,4 +88,4 @@ bool KeyGroupIteratorImpl::next() {
return result;
}
} //namespace NativeTask
} // namespace NativeTask

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
#include "Log.h"
#include "lib/Log.h"
namespace NativeTask {

View File

@ -16,19 +16,21 @@
* limitations under the License.
*/
#include "commons.h"
#include <string>
#include "lib/commons.h"
#include "util/Timer.h"
#include "util/StringUtil.h"
#include "FileSystem.h"
#include "NativeObjectFactory.h"
#include "MapOutputCollector.h"
#include "Merge.h"
#include "lib/FileSystem.h"
#include "lib/NativeObjectFactory.h"
#include "lib/MapOutputCollector.h"
#include "lib/Merge.h"
#include "NativeTask.h"
#include "WritableUtils.h"
#include "util/WritableUtils.h"
#include "util/DualPivotQuickSort.h"
#include "Combiner.h"
#include "TaskCounters.h"
#include "MinHeap.h"
#include "lib/Combiner.h"
#include "lib/TaskCounters.h"
#include "lib/MinHeap.h"
namespace NativeTask {
@ -36,22 +38,18 @@ ICombineRunner * CombineRunnerWrapper::createCombiner() {
ICombineRunner * combineRunner = NULL;
if (NULL != _config->get(NATIVE_COMBINER)) {
const char * combinerClass = _config->get(NATIVE_COMBINER);
ObjectCreatorFunc objectCreater = NativeObjectFactory::GetObjectCreator(combinerClass);
if (NULL == objectCreater) {
THROW_EXCEPTION_EX(UnsupportException, "Combiner not found: %s", combinerClass);
} else {
LOG("[MapOutputCollector::configure] native combiner is enabled: %s", combinerClass);
}
combineRunner = new NativeCombineRunner(_config, objectCreater);
// Earlier versions of this code supported user-defined
// native Combiner implementations. This simplified version
// no longer supports it.
THROW_EXCEPTION_EX(UnsupportException, "Native Combiners not supported");
}
CombineHandler * javaCombiner = _spillOutput->getJavaCombineHandler();
if (NULL != javaCombiner) {
_isJavaCombiner = true;
combineRunner = (ICombineRunner *)javaCombiner;
} else {
CombineHandler * javaCombiner = _spillOutput->getJavaCombineHandler();
if (NULL != javaCombiner) {
_isJavaCombiner = true;
combineRunner = (ICombineRunner *)javaCombiner;
} else {
LOG("[MapOutputCollector::getCombiner] cannot get combine handler from java");
}
LOG("[MapOutputCollector::getCombiner] cannot get combine handler from java");
}
return combineRunner;
}
@ -118,7 +116,7 @@ void MapOutputCollector::init(uint32_t defaultBlockSize, uint32_t memoryCapacity
_pool->init(memoryCapacity);
//TODO: add support for customized comparator
// TODO: add support for customized comparator
this->_keyComparator = keyComparator;
_buckets = new PartitionBucket*[_numPartitions];
@ -160,7 +158,8 @@ void MapOutputCollector::configure(Config * config) {
uint32_t capacity = config->getInt(MAPRED_IO_SORT_MB, 300) * 1024 * 1024;
uint32_t defaultBlockSize = getDefaultBlockSize(capacity, _numPartitions, maxBlockSize);
LOG("Native Total MemoryBlockPool: num_partitions %u, min_block_size %uK, max_block_size %uK, capacity %uM", _numPartitions, defaultBlockSize / 1024,
LOG("Native Total MemoryBlockPool: num_partitions %u, min_block_size %uK, "
"max_block_size %uK, capacity %uM", _numPartitions, defaultBlockSize / 1024,
maxBlockSize / 1024, capacity / 1024 / 1024);
ComparatorPtr comparator = getComparator(config, _spec);
@ -179,7 +178,8 @@ void MapOutputCollector::configure(Config * config) {
KVBuffer * MapOutputCollector::allocateKVBuffer(uint32_t partitionId, uint32_t kvlength) {
PartitionBucket * partition = getPartition(partitionId);
if (NULL == partition) {
THROW_EXCEPTION_EX(IOException, "Partition is NULL, partition_id: %d, num_partitions: %d", partitionId, _numPartitions);
THROW_EXCEPTION_EX(IOException, "Partition is NULL, partition_id: %d, num_partitions: %d",
partitionId, _numPartitions);
}
KVBuffer * dest = partition->allocateKVBuffer(kvlength);
@ -301,7 +301,7 @@ void MapOutputCollector::middleSpill(const std::string & spillOutput,
info->path = spillOutput;
uint64_t spillTime = timer.now() - timer.last() - metrics.sortTime;
const uint64_t M = 1000000; //million
const uint64_t M = 1000000; // million
LOG("%s-spill: { id: %d, collect: %"PRIu64" ms, "
"in-memory sort: %"PRIu64" ms, in-memory records: %"PRIu64", "
"merge&spill: %"PRIu64" ms, uncompressed size: %"PRIu64", "
@ -369,7 +369,7 @@ void MapOutputCollector::finalSpill(const std::string & filepath,
uint64_t recordCount;
writer->getStatistics(outputSize, realOutputSize, recordCount);
const uint64_t M = 1000000; //million
const uint64_t M = 1000000; // million
LOG("Final-merge-spill: { id: %d, in-memory sort: %"PRIu64" ms, "
"in-memory records: %"PRIu64", merge&spill: %"PRIu64" ms, "
"records: %"PRIu64", uncompressed size: %"PRIu64", "

View File

@ -20,14 +20,14 @@
#define MAP_OUTPUT_COLLECTOR_H_
#include "NativeTask.h"
#include "MemoryPool.h"
#include "Timer.h"
#include "Buffers.h"
#include "MapOutputSpec.h"
#include "IFile.h"
#include "SpillInfo.h"
#include "Combiner.h"
#include "PartitionBucket.h"
#include "lib/MemoryPool.h"
#include "util/Timer.h"
#include "lib/Buffers.h"
#include "lib/MapOutputSpec.h"
#include "lib/IFile.h"
#include "lib/SpillInfo.h"
#include "lib/Combiner.h"
#include "lib/PartitionBucket.h"
#include "lib/SpillOutputService.h"
namespace NativeTask {

View File

@ -16,8 +16,8 @@
* limitations under the License.
*/
#include "commons.h"
#include "MapOutputSpec.h"
#include "lib/commons.h"
#include "lib/MapOutputSpec.h"
#include "NativeTask.h"
namespace NativeTask {

View File

@ -20,8 +20,8 @@
#define MAPOUTPUTSPEC_H_
#include <string>
#include "Checksum.h"
#include "WritableUtils.h"
#include "util/Checksum.h"
#include "util/WritableUtils.h"
#include "NativeTask.h"
namespace NativeTask {

View File

@ -15,18 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "NativeTask.h"
#include "commons.h"
#include "Timer.h"
#include "Buffers.h"
#include "MapOutputSpec.h"
#include "IFile.h"
#include "SpillInfo.h"
#include "Combiner.h"
#include "MapOutputSpec.h"
#include "MemoryBlock.h"
#include "MemoryPool.h"
#include <algorithm>
#include "NativeTask.h"
#include "lib/commons.h"
#include "util/Timer.h"
#include "lib/Buffers.h"
#include "lib/MapOutputSpec.h"
#include "lib/IFile.h"
#include "lib/SpillInfo.h"
#include "lib/Combiner.h"
#include "lib/MemoryBlock.h"
#include "lib/MemoryPool.h"
#include "util/DualPivotQuickSort.h"
namespace NativeTask {
@ -62,4 +64,4 @@ void MemoryBlock::sort(SortAlgorithm type, ComparatorPtr comparator) {
}
_sorted = true;
}
} //namespace NativeTask
} // namespace NativeTask

View File

@ -19,8 +19,8 @@
#ifndef MEMORYPOOL_H_
#define MEMORYPOOL_H_
#include "Buffers.h"
#include "MapOutputSpec.h"
#include "lib/Buffers.h"
#include "lib/MapOutputSpec.h"
#include "NativeTask.h"
#include "util/StringUtil.h"

View File

@ -16,10 +16,10 @@
* limitations under the License.
*/
#include "commons.h"
#include "lib/commons.h"
#include "util/Timer.h"
#include "util/StringUtil.h"
#include "Merge.h"
#include "lib/Merge.h"
#include "lib/FileSystem.h"
namespace NativeTask {
@ -34,7 +34,6 @@ Merger::Merger(IFileWriter * writer, Config * config, ComparatorPtr comparator,
ICombineRunner * combineRunner)
: _writer(writer), _config(config), _combineRunner(combineRunner), _first(true),
_comparator(comparator) {
}
Merger::~Merger() {

View File

@ -20,10 +20,10 @@
#define MERGE_H_
#include "NativeTask.h"
#include "Buffers.h"
#include "MapOutputCollector.h"
#include "IFile.h"
#include "MinHeap.h"
#include "lib/Buffers.h"
#include "lib/MapOutputCollector.h"
#include "lib/IFile.h"
#include "lib/MinHeap.h"
namespace NativeTask {

View File

@ -20,7 +20,7 @@
#define MIN_HEAP_H_
#include "NativeTask.h"
#include "Buffers.h"
#include "lib/Buffers.h"
template<typename T, typename Compare>
void heapify(T* first, int rt, int heap_len, Compare & Comp) {

View File

@ -18,9 +18,9 @@
#include <dlfcn.h>
#include "commons.h"
#include "NativeObjectFactory.h"
#include "NativeLibrary.h"
#include "lib/commons.h"
#include "lib/NativeObjectFactory.h"
#include "lib/NativeLibrary.h"
namespace NativeTask {
@ -30,7 +30,6 @@ namespace NativeTask {
NativeLibrary::NativeLibrary(const string & path, const string & name)
: _path(path), _name(name), _getObjectCreatorFunc(NULL), _functionGetter(NULL) {
}
bool NativeLibrary::init() {

View File

@ -20,11 +20,11 @@
#ifndef __CYGWIN__
#include <execinfo.h>
#endif
#include "commons.h"
#include "lib/commons.h"
#include "NativeTask.h"
#include "NativeObjectFactory.h"
#include "NativeLibrary.h"
#include "BufferStream.h"
#include "lib/NativeObjectFactory.h"
#include "lib/NativeLibrary.h"
#include "lib/BufferStream.h"
#include "util/StringUtil.h"
#include "util/SyncUtils.h"
#include "util/WritableUtils.h"
@ -53,19 +53,10 @@ extern "C" void handler(int sig) {
}
DEFINE_NATIVE_LIBRARY(NativeTask) {
//signal(SIGSEGV, handler);
REGISTER_CLASS(BatchHandler, NativeTask);
REGISTER_CLASS(CombineHandler, NativeTask);
REGISTER_CLASS(MCollectorOutputHandler, NativeTask);
REGISTER_CLASS(Mapper, NativeTask);
REGISTER_CLASS(Reducer, NativeTask);
REGISTER_CLASS(Partitioner, NativeTask);
REGISTER_CLASS(Folder, NativeTask);
NativeObjectFactory::SetDefaultClass(BatchHandlerType, "NativeTask.BatchHandler");
NativeObjectFactory::SetDefaultClass(MapperType, "NativeTask.Mapper");
NativeObjectFactory::SetDefaultClass(ReducerType, "NativeTask.Reducer");
NativeObjectFactory::SetDefaultClass(PartitionerType, "NativeTask.Partitioner");
NativeObjectFactory::SetDefaultClass(FolderType, "NativeTask.Folder");
}
namespace NativeTask {
@ -132,7 +123,7 @@ bool NativeObjectFactory::Init() {
}
const char * version = GetConfig().get(NATIVE_HADOOP_VERSION);
LOG("[NativeObjectLibrary] NativeTask library initialized with hadoop %s",
version==NULL?"unkown":version);
version == NULL ? "unkown" : version);
}
return true;
}

View File

@ -20,9 +20,9 @@
#include "org_apache_hadoop_mapred_nativetask_NativeRuntime.h"
#endif
#include "config.h"
#include "commons.h"
#include "jniutils.h"
#include "NativeObjectFactory.h"
#include "lib/commons.h"
#include "lib/jniutils.h"
#include "lib/NativeObjectFactory.h"
using namespace NativeTask;
@ -36,8 +36,7 @@ using namespace NativeTask;
* Signature: ([B)Z
*/
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_supportsCompressionCodec
(JNIEnv *jenv, jclass clazz, jbyteArray codec)
{
(JNIEnv *jenv, jclass clazz, jbyteArray codec) {
const std::string codecString = JNU_ByteArrayToString(jenv, codec);
if ("org.apache.hadoop.io.compress.GzipCodec" == codecString) {
return JNI_TRUE;
@ -58,7 +57,8 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntim
* Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
* Method: JNIRelease
* Signature: ()V
*/JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIRelease(
*/
JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIRelease(
JNIEnv * jenv, jclass nativeRuntimeClass) {
try {
NativeTask::NativeObjectFactory::Release();
@ -82,7 +82,8 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntim
* Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
* Method: JNIConfigure
* Signature: ([[B)V
*/JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIConfigure(
*/
JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIConfigure(
JNIEnv * jenv, jclass nativeRuntimeClass, jobjectArray configs) {
try {
NativeTask::Config & config = NativeTask::NativeObjectFactory::GetConfig();
@ -139,7 +140,8 @@ jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateNa
* Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
* Method: JNICreateDefaultNativeObject
* Signature: ([B)J
*/JNIEXPORT jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateDefaultNativeObject(
*/
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateDefaultNativeObject(
JNIEnv * jenv, jclass nativeRuntimeClass, jbyteArray type) {
try {
std::string typeString = JNU_ByteArrayToString(jenv, type);
@ -166,7 +168,8 @@ jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateNa
* Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
* Method: JNIReleaseNativeObject
* Signature: (J)V
*/JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIReleaseNativeObject(
*/
JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIReleaseNativeObject(
JNIEnv * jenv, jclass nativeRuntimeClass, jlong objectAddr) {
try {
NativeTask::NativeObject * nobj = ((NativeTask::NativeObject *)objectAddr);
@ -196,7 +199,8 @@ jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateNa
* Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
* Method: JNIRegisterModule
* Signature: ([B[B)I
*/JNIEXPORT jint JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIRegisterModule(
*/
JNIEXPORT jint JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIRegisterModule(
JNIEnv * jenv, jclass nativeRuntimeClass, jbyteArray modulePath, jbyteArray moduleName) {
try {
std::string pathString = JNU_ByteArrayToString(jenv, modulePath);
@ -225,7 +229,8 @@ jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateNa
* Class: org_apache_hadoop_mapred_nativetask_NativeRuntime
* Method: JNIUpdateStatus
* Signature: ()[B
*/JNIEXPORT jbyteArray JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIUpdateStatus(
*/
JNIEXPORT jbyteArray JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIUpdateStatus(
JNIEnv * jenv, jclass nativeRuntimeClass) {
try {
std::string statusData;

View File

@ -18,11 +18,10 @@
#ifndef __CYGWIN__
#include <execinfo.h>
#endif
#include "commons.h"
#include "util/Hash.h"
#include "lib/commons.h"
#include "util/StringUtil.h"
#include "NativeTask.h"
#include "NativeObjectFactory.h"
#include "lib/NativeObjectFactory.h"
namespace NativeTask {
@ -34,20 +33,6 @@ const string NativeObjectTypeToString(NativeObjectType type) {
switch (type) {
case BatchHandlerType:
return string("BatchHandlerType");
case MapperType:
return string("MapperType");
case ReducerType:
return string("ReducerType");
case PartitionerType:
return string("PartitionerType");
case CombinerType:
return string("CombinerType");
case FolderType:
return string("FolderType");
case RecordReaderType:
return string("RecordReaderType");
case RecordWriterType:
return string("RecordWriterType");
default:
return string("UnknownObjectType");
}
@ -56,20 +41,6 @@ const string NativeObjectTypeToString(NativeObjectType type) {
NativeObjectType NativeObjectTypeFromString(const string type) {
if (type == "BatchHandlerType") {
return BatchHandlerType;
} else if (type == "MapperType") {
return MapperType;
} else if (type == "ReducerType") {
return ReducerType;
} else if (type == "PartitionerType") {
return PartitionerType;
} else if (type == "CombinerType") {
return CombinerType;
} else if (type == "FolderType") {
return CombinerType;
} else if (type == "RecordReaderType") {
return RecordReaderType;
} else if (type == "RecordWriterType") {
return RecordWriterType;
}
return UnknownObjectType;
}
@ -96,7 +67,7 @@ HadoopException::HadoopException(const string & what) {
#ifndef __CYGWIN__
size = backtrace(array, 64);
char ** traces = backtrace_symbols(array, size);
for (size_t i = 0; i< size;i++) {
for (size_t i = 0; i < size; i++) {
_reason.append("\n\t");
_reason.append(traces[i]);
}
@ -235,13 +206,6 @@ Counter * ProcessorBase::getCounter(const string & group, const string & name) {
return NULL;
}
uint32_t Partitioner::getPartition(const char * key, uint32_t & keyLen, uint32_t numPartition) {
if (numPartition == 1) {
return 0;
}
return (Hash::BytesHash(key, keyLen) & 0x7fffffff) % numPartition;
}
///////////////////////////////////////////////////////////
}
} // namespace NativeTask

View File

@ -16,19 +16,19 @@
* limitations under the License.
*/
#include "commons.h"
#include "lib/commons.h"
#include "util/Timer.h"
#include "util/StringUtil.h"
#include "NativeObjectFactory.h"
#include "PartitionBucket.h"
#include "Merge.h"
#include "lib/NativeObjectFactory.h"
#include "lib/PartitionBucket.h"
#include "lib/Merge.h"
#include "NativeTask.h"
#include "WritableUtils.h"
#include "util/WritableUtils.h"
#include "util/DualPivotQuickSort.h"
#include "Combiner.h"
#include "TaskCounters.h"
#include "MinHeap.h"
#include "PartitionBucketIterator.h"
#include "lib/Combiner.h"
#include "lib/TaskCounters.h"
#include "lib/MinHeap.h"
#include "lib/PartitionBucketIterator.h"
namespace NativeTask {
@ -39,7 +39,8 @@ KVIterator * PartitionBucket::getIterator() {
return new PartitionBucketIterator(this, _keyComparator);
}
void PartitionBucket::spill(IFileWriter * writer) throw (IOException, UnsupportException) {
void PartitionBucket::spill(IFileWriter * writer)
throw(IOException, UnsupportException) {
KVIterator * iterator = getIterator();
if (NULL == iterator || NULL == writer) {
return;
@ -71,6 +72,4 @@ void PartitionBucket::sort(SortAlgorithm type) {
_sorted = true;
}
}
;
// namespace NativeTask
} // namespace NativeTask

View File

@ -20,14 +20,14 @@
#define PARTITION_BUCKET_H_
#include "NativeTask.h"
#include "MemoryPool.h"
#include "MemoryBlock.h"
#include "Timer.h"
#include "Buffers.h"
#include "MapOutputSpec.h"
#include "IFile.h"
#include "SpillInfo.h"
#include "Combiner.h"
#include "lib/MemoryPool.h"
#include "lib/MemoryBlock.h"
#include "util/Timer.h"
#include "lib/Buffers.h"
#include "lib/MapOutputSpec.h"
#include "lib/IFile.h"
#include "lib/SpillInfo.h"
#include "lib/Combiner.h"
namespace NativeTask {

View File

@ -16,18 +16,20 @@
* limitations under the License.
*/
#include "commons.h"
#include <algorithm>
#include "lib/commons.h"
#include "util/Timer.h"
#include "util/StringUtil.h"
#include "NativeObjectFactory.h"
#include "PartitionBucketIterator.h"
#include "Merge.h"
#include "lib/NativeObjectFactory.h"
#include "lib/PartitionBucketIterator.h"
#include "lib/Merge.h"
#include "NativeTask.h"
#include "WritableUtils.h"
#include "util/WritableUtils.h"
#include "util/DualPivotQuickSort.h"
#include "Combiner.h"
#include "TaskCounters.h"
#include "MinHeap.h"
#include "lib/Combiner.h"
#include "lib/TaskCounters.h"
#include "lib/MinHeap.h"
namespace NativeTask {
@ -108,7 +110,5 @@ bool PartitionBucketIterator::next(Buffer & key, Buffer & value) {
}
return false;
}
}
;
// namespace NativeTask
} // namespace NativeTask

View File

@ -20,14 +20,14 @@
#define PARTITION_BUCKET_ITERATOR_H_
#include "NativeTask.h"
#include "MemoryPool.h"
#include "Timer.h"
#include "Buffers.h"
#include "MapOutputSpec.h"
#include "IFile.h"
#include "SpillInfo.h"
#include "Combiner.h"
#include "PartitionBucket.h"
#include "lib/MemoryPool.h"
#include "util/Timer.h"
#include "lib/Buffers.h"
#include "lib/MapOutputSpec.h"
#include "lib/IFile.h"
#include "lib/SpillInfo.h"
#include "lib/Combiner.h"
#include "lib/PartitionBucket.h"
namespace NativeTask {

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
#include "Path.h"
#include "lib/Path.h"
namespace NativeTask {

View File

@ -16,11 +16,11 @@
* limitations under the License.
*/
#include "commons.h"
#include "Streams.h"
#include "FileSystem.h"
#include "Buffers.h"
#include "SpillInfo.h"
#include "lib/commons.h"
#include "lib/Streams.h"
#include "lib/FileSystem.h"
#include "lib/Buffers.h"
#include "lib/SpillInfo.h"
namespace NativeTask {
@ -69,5 +69,5 @@ void SingleSpillInfo::writeSpillInfo(const std::string & filepath) {
delete fout;
}
}
} // namespace NativeTask

View File

@ -16,9 +16,9 @@
* limitations under the License.
*/
#include "commons.h"
#include "Checksum.h"
#include "Streams.h"
#include "lib/commons.h"
#include "util/Checksum.h"
#include "lib/Streams.h"
namespace NativeTask {

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
#include "TaskCounters.h"
#include "lib/TaskCounters.h"
namespace NativeTask {
@ -36,6 +36,5 @@ const char * TaskCounters::FILESYSTEM_COUNTER_GROUP = "FileSystemCounters";
DEFINE_COUNTER(FILE_BYTES_READ)
DEFINE_COUNTER(FILE_BYTES_WRITTEN)
;
} // namespace NativeTask

View File

@ -1,50 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef TRACKING_COLLECTOR_H
#define TRACKING_COLLECTOR_H
#include <stdint.h>
#include <string>
namespace NativeTask {
class TrackingCollector : public Collector {
protected:
Collector * _collector;
Counter * _counter;
public:
TrackingCollector(Collector * collector, Counter * counter)
: _collector(collector), _counter(counter) {
}
virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen) {
_counter->increase();
_collector->collect(key, keyLen, value, valueLen);
}
virtual void collect(const void * key, uint32_t keyLen, const void * value, uint32_t valueLen,
int32_t partition) {
_counter->increase();
_collector->collect(key, keyLen, value, valueLen, partition);
}
};
} //namespace NativeTask
#endif //TRACKING_COLLECTOR_H

View File

@ -41,13 +41,12 @@
#include <map>
#include <algorithm>
#include "primitives.h"
#include "Log.h"
#include "lib/primitives.h"
#include "lib/Log.h"
#include "NativeTask.h"
#include "Constants.h"
#include "lib/Constants.h"
#include "Iterator.h"
#include "TrackingCollector.h"
#include "lib/Iterator.h"
#endif /* COMMONS_H_ */

View File

@ -16,10 +16,10 @@
* limitations under the License.
*/
#include "commons.h"
#include "lib/commons.h"
#include "util/StringUtil.h"
#include "util/SyncUtils.h"
#include "jniutils.h"
#include "lib/jniutils.h"
using namespace NativeTask;
@ -53,7 +53,7 @@ JavaVM * JNU_GetJVM(void) {
JavaVMOption options[noArgs];
options[0].optionString = optHadoopClassPath;
//Create the VM
// Create the VM
JavaVMInitArgs vm_args;
vm_args.version = JNI_VERSION_1_6;
vm_args.options = options;
@ -108,4 +108,4 @@ std::string JNU_ByteArrayToString(JNIEnv * jenv, jbyteArray src) {
return ret;
}
return std::string();
}
}

View File

@ -17,7 +17,7 @@
*/
#include <assert.h>
#include "Checksum.h"
#include "util/Checksum.h"
namespace NativeTask {

View File

@ -1,24 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Hash.h"
namespace NativeTask {
} // namespace NativeTask

View File

@ -1,73 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef HASH_H_
#define HASH_H_
#include <stdint.h>
#include <stdlib.h>
// Hash function for a byte array.
extern uint64_t CityHash64(const char *buf, size_t len);
// Hash function for a byte array. For convenience, a 64-bit seed is also
// hashed into the result.
extern uint64_t CityHash64WithSeed(const char *buf, size_t len, uint64_t seed);
namespace NativeTask {
class Hash {
public:
/**
* Compatible with hadoop Text & BytesWritable hash
*/
inline static int32_t BytesHash(const char * bytes, uint32_t length) {
int32_t hash = 1;
for (uint32_t i = 0; i < length; i++)
hash = (31 * hash) + (int32_t)bytes[i];
return hash;
}
/**
* Unsigned version of BytesHash
*/
inline static uint32_t BytesHashU(const char * bytes, uint32_t length) {
uint32_t hash = 1;
for (uint32_t i = 0; i < length; i++)
hash = (31U * hash) + (uint32_t)bytes[i];
return hash;
}
/**
* City hash, faster for longer input
*/
inline static uint64_t CityHash(const char * bytes, uint32_t length) {
return CityHash64(bytes, length);
}
/**
* City hash, faster for longer input
*/
inline static uint64_t CityHashWithSeed(const char * bytes, uint32_t length, uint64_t seed) {
return CityHash64WithSeed(bytes, length, seed);
}
};
} // namespace NativeTask
#endif /* HASH_H_ */

View File

@ -17,8 +17,8 @@
*/
#include <math.h>
#include "commons.h"
#include "Random.h"
#include "lib/commons.h"
#include "util/Random.h"
namespace NativeTask {

View File

@ -17,8 +17,8 @@
*/
#include <stdarg.h>
#include "commons.h"
#include "StringUtil.h"
#include "lib/commons.h"
#include "util/StringUtil.h"
namespace NativeTask {
@ -201,8 +201,9 @@ bool StringUtil::StartsWith(const string & str, const string & prefix) {
}
bool StringUtil::EndsWith(const string & str, const string & suffix) {
if ((suffix.length() > str.length())
|| (memcmp(str.data() + str.length() - suffix.length(), suffix.data(), suffix.length()) != 0)) {
if ((suffix.length() > str.length()) ||
(memcmp(str.data() + str.length() - suffix.length(),
suffix.data(), suffix.length()) != 0)) {
return false;
}
return true;

View File

@ -16,10 +16,10 @@
* limitations under the License.
*/
#include "commons.h"
#include "jniutils.h"
#include "StringUtil.h"
#include "SyncUtils.h"
#include "lib/commons.h"
#include "lib/jniutils.h"
#include "util/StringUtil.h"
#include "util/SyncUtils.h"
namespace NativeTask {
@ -52,110 +52,4 @@ void Lock::unlock() {
PthreadCall("unlock", pthread_mutex_unlock(&_mutex));
}
#ifdef __MACH__
SpinLock::SpinLock() : _spin(0) {
}
SpinLock::~SpinLock() {
}
void SpinLock::lock() {
OSSpinLockLock(&_spin);
}
void SpinLock::unlock() {
OSSpinLockUnlock(&_spin);
}
#else
SpinLock::SpinLock() {
PthreadCall("init mutex", pthread_spin_init(&_spin, 0));
}
SpinLock::~SpinLock() {
PthreadCall("destroy mutex", pthread_spin_destroy(&_spin));
}
void SpinLock::lock() {
PthreadCall("lock", pthread_spin_lock(&_spin));
}
void SpinLock::unlock() {
PthreadCall("unlock", pthread_spin_unlock(&_spin));
}
#endif
Condition::Condition(Lock* mu)
: _lock(mu) {
PthreadCall("init cv", pthread_cond_init(&_condition, NULL));
}
Condition::~Condition() {
PthreadCall("destroy cv", pthread_cond_destroy(&_condition));
}
void Condition::wait() {
PthreadCall("wait", pthread_cond_wait(&_condition, &_lock->_mutex));
}
void Condition::signal() {
PthreadCall("signal", pthread_cond_signal(&_condition));
}
void Condition::signalAll() {
PthreadCall("broadcast", pthread_cond_broadcast(&_condition));
}
void * Thread::ThreadRunner(void * pthis) {
try {
((Thread*)pthis)->run();
} catch (std::exception & e) {
LOG("err!!!! %s", e.what());
}
return NULL;
}
Thread::Thread()
: _thread((pthread_t)0), // safe for linux & macos
_runable(NULL) {
}
Thread::Thread(Runnable * runnable)
: _thread((pthread_t)0), _runable(runnable) {
}
void Thread::setTask(const Runnable & runnable) {
_runable = const_cast<Runnable*>(&runnable);
}
Thread::~Thread() {
}
void Thread::start() {
PthreadCall("pthread_create", pthread_create(&_thread, NULL, ThreadRunner, this));
}
void Thread::join() {
PthreadCall("pthread_join", pthread_join(_thread, NULL));
}
void Thread::stop() {
PthreadCall("pthread_cancel", pthread_cancel(_thread));
}
void Thread::run() {
if (_runable != NULL) {
_runable->run();
}
}
void Thread::EnableJNI() {
JNU_AttachCurrentThread();
}
void Thread::ReleaseJNI() {
JNU_DetachCurrentThread();
}
} // namespace NativeTask

View File

@ -47,38 +47,6 @@ private:
void operator=(const Lock&);
};
class SpinLock {
public:
SpinLock();
~SpinLock();
void lock();
void unlock();
private:
#ifdef __MACH__
OSSpinLock _spin;
#else
pthread_spinlock_t _spin;
#endif
// No copying
SpinLock(const Lock&);
void operator=(const Lock&);
};
class Condition {
public:
explicit Condition(Lock* mu);
~Condition();
void wait();
void signal();
void signalAll();
private:
pthread_cond_t _condition;
Lock* _lock;
};
template<typename LockT>
class ScopeLock {
public:
@ -97,202 +65,6 @@ private:
void operator=(const ScopeLock&);
};
class Runnable {
public:
virtual ~Runnable() {
}
virtual void run() = 0;
};
class Thread : public Runnable {
protected:
pthread_t _thread;
Runnable * _runable;
public:
Thread();
Thread(Runnable * runnable);
virtual ~Thread();
void setTask(const Runnable & runnable);
void start();
void join();
void stop();
virtual void run();
/**
* Enable JNI for current thread
*/
static void EnableJNI();
/**
* Release JNI when thread is at end if current
* thread called EnableJNI before
*/
static void ReleaseJNI();
private:
static void * ThreadRunner(void * pthis);
};
// Sure <tr1/functional> is better
template<typename Subject, typename Method>
class FunctionRunner : public Runnable {
protected:
Subject & _subject;
Method _method;
public:
FunctionRunner(Subject & subject, Method method)
: _subject(subject), _method(method) {
}
virtual void run() {
(_subject.*_method)();
}
};
template<typename Subject, typename Method, typename Arg>
class FunctionRunner1 : public Runnable {
protected:
Subject & _subject;
Method _method;
Arg _arg;
public:
FunctionRunner1(Subject & subject, Method method, Arg arg)
: _subject(subject), _method(method), _arg(arg) {
}
virtual void run() {
(_subject.*_method)(_arg);
}
};
template<typename Subject, typename Method, typename Arg1, typename Arg2>
class FunctionRunner2 : public Runnable {
protected:
Subject & _subject;
Method _method;
Arg1 _arg1;
Arg2 _arg2;
public:
FunctionRunner2(Subject & subject, Method method, Arg1 arg1, Arg2 arg2)
: _subject(subject), _method(method), _arg1(arg1), _arg2(arg2) {
}
virtual void run() {
(_subject.*_method)(_arg1, _arg2);
}
};
template<typename Subject, typename Method>
inline FunctionRunner<Subject, Method> * BindNew(Subject & subject, Method method) {
return new FunctionRunner<Subject, Method>(subject, method);
}
template<typename Subject, typename Method, typename Arg>
inline FunctionRunner1<Subject, Method, Arg> * BindNew(Subject & subject, Method method, Arg arg) {
return new FunctionRunner1<Subject, Method, Arg>(subject, method, arg);
}
template<typename Subject, typename Method, typename Arg1, typename Arg2>
inline FunctionRunner2<Subject, Method, Arg1, Arg2> * BindNew(Subject & subject, Method method,
Arg1 arg1, Arg2 arg2) {
return new FunctionRunner2<Subject, Method, Arg1, Arg2>(subject, method, arg1, arg2);
}
class ConcurrentIndex {
private:
size_t _index;
size_t _end;
SpinLock _lock;
public:
ConcurrentIndex(size_t count)
: _index(0), _end(count) {
}
ConcurrentIndex(size_t start, size_t end)
: _index(start), _end(end) {
}
size_t count() {
return _end;
}
ssize_t next() {
ScopeLock<SpinLock> autoLock(_lock);
if (_index == _end) {
return -1;
} else {
ssize_t ret = _index;
_index++;
return ret;
}
}
};
template<typename Subject, typename Method, typename RangeType>
class ParallelForWorker : public Runnable {
protected:
ConcurrentIndex * _index;
Subject * _subject;
Method _method;
public:
ParallelForWorker()
: _index(NULL), _subject(NULL) {
}
ParallelForWorker(ConcurrentIndex * index, Subject * subject, Method method)
: _index(index), _subject(subject), _method(method) {
}
void reset(ConcurrentIndex * index, Subject * subject, Method method) {
_index = index;
_subject = subject;
_method = method;
}
virtual void run() {
ssize_t i;
while ((i = _index->next()) >= 0) {
(_subject->*_method)(i);
}
}
};
template<typename Subject, typename Method, typename RangeType>
void ParallelFor(Subject & subject, Method method, RangeType begin, RangeType end,
size_t thread_num) {
RangeType count = end - begin;
if (thread_num <= 1 || count <= 1) {
for (RangeType i = begin; i < end; i++) {
(subject.*method)(i);
}
} else if (thread_num == 2) {
ConcurrentIndex index = ConcurrentIndex(begin, end);
ParallelForWorker<Subject, Method, RangeType> workers[2];
Thread sideThread;
workers[0].reset(&index, &subject, method);
workers[1].reset(&index, &subject, method);
sideThread.setTask(workers[0]);
sideThread.start();
workers[1].run();
sideThread.join();
} else {
ConcurrentIndex index = ConcurrentIndex(begin, end);
ParallelForWorker<Subject, Method, RangeType> * workers = new ParallelForWorker<Subject, Method,
RangeType> [thread_num];
Thread * threads = new Thread[thread_num - 1];
for (size_t i = 0; i < thread_num - 1; i++) {
workers[i].reset(&index, &subject, method);
threads[i].setTask(workers[i]);
threads[i].start();
}
workers[thread_num - 1].reset(&index, &subject, method);
workers[thread_num - 1].run();
for (size_t i = 0; i < thread_num - 1; i++) {
threads[i].join();
}
delete[] threads;
delete[] workers;
}
}
} // namespace NativeTask

View File

@ -17,9 +17,9 @@
*/
#include <time.h>
#include "commons.h"
#include "StringUtil.h"
#include "Timer.h"
#include "lib/commons.h"
#include "util/StringUtil.h"
#include "util/Timer.h"
namespace NativeTask {
@ -51,7 +51,6 @@ Timer::Timer() {
}
Timer::~Timer() {
}
uint64_t Timer::last() {

View File

@ -16,9 +16,9 @@
* limitations under the License.
*/
#include "commons.h"
#include "StringUtil.h"
#include "WritableUtils.h"
#include "lib/commons.h"
#include "util/StringUtil.h"
#include "util/WritableUtils.h"
namespace NativeTask {

View File

@ -21,7 +21,7 @@
#include <stdint.h>
#include <string>
#include "Streams.h"
#include "lib/Streams.h"
#include "NativeTask.h"
namespace NativeTask {

View File

@ -16,13 +16,13 @@
* limitations under the License.
*/
#include "commons.h"
#include "BufferStream.h"
#include "Buffers.h"
#include "lib/commons.h"
#include "lib/BufferStream.h"
#include "lib/Buffers.h"
#include "test_commons.h"
#include "NativeTask.h"
using namespace NativeTask;
namespace NativeTask {
TEST(Command, equals) {
Command cmd1(100, "hello command");
@ -36,3 +36,4 @@ TEST(Command, equals) {
ASSERT_EQ(helloCommand, cmd1.description());
}
} // namespace NativeTask

View File

@ -18,11 +18,11 @@
#include "lz4.h"
#include "config.h"
#include "commons.h"
#include "Path.h"
#include "BufferStream.h"
#include "FileSystem.h"
#include "Compressions.h"
#include "lib/commons.h"
#include "lib/Path.h"
#include "lib/BufferStream.h"
#include "lib/FileSystem.h"
#include "lib/Compressions.h"
#include "test_commons.h"
#if defined HADOOP_SNAPPY_LIBRARY
@ -144,7 +144,7 @@ TEST(Perf, CompressionUtil) {
}
class CompressResult {
public:
public:
uint64_t uncompressedSize;
uint64_t compressedSize;
uint64_t compressTime;
@ -193,10 +193,8 @@ void MeasureSingleFileLz4(const string & path, CompressResult & total, size_t bl
result.compressTime += endTime - startTime;
startTime = t.now();
for (int i = 0; i < times; i++) {
// memset(dest, 0, currentblocksize+8);
int osize = LZ4_uncompress(outputBuffer, dest, currentblocksize);
ASSERT_EQ(currentblocksize, osize);
// printf("%016llx blocksize: %lu\n", bswap64(*(uint64_t*)(dest+currentblocksize)), currentblocksize);
}
endTime = t.now();
result.uncompressTime += endTime - startTime;

View File

@ -16,9 +16,9 @@
* limitations under the License.
*/
#include "commons.h"
#include "BufferStream.h"
#include "Buffers.h"
#include "lib/commons.h"
#include "lib/BufferStream.h"
#include "lib/Buffers.h"
#include "test_commons.h"
float absoute(float v) {

View File

@ -16,10 +16,10 @@
* limitations under the License.
*/
#include "commons.h"
#include "NativeObjectFactory.h"
#include "BufferStream.h"
#include "Buffers.h"
#include "lib/commons.h"
#include "lib/NativeObjectFactory.h"
#include "lib/BufferStream.h"
#include "lib/Buffers.h"
#include "test_commons.h"
TEST(Counter, Counter) {

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
#include "FileSystem.h"
#include "lib/FileSystem.h"
#include "test_commons.h"
TEST(FileSystem, RawFileSystem) {

View File

@ -17,11 +17,11 @@
*/
#include <algorithm>
#include "commons.h"
#include "lib/commons.h"
#include "config.h"
#include "BufferStream.h"
#include "FileSystem.h"
#include "IFile.h"
#include "lib/BufferStream.h"
#include "lib/FileSystem.h"
#include "lib/IFile.h"
#include "test_commons.h"
SingleSpillInfo * writeIFile(int partition, vector<pair<string, string> > & kvs,
@ -166,8 +166,8 @@ TEST(Perf, IFile) {
delete[] buff;
}
// The Glibc has a bug in the file tell api, it will overwrite the file data
// unexpected.
// The Glibc has a bug in the file tell api, it will overwrite the file data
// unexpected.
// Please check https://rhn.redhat.com/errata/RHBA-2013-0279.html
// This case is to check whether the bug exists.
// If it exists, it means you need to upgrade the glibc.
@ -189,12 +189,11 @@ TEST(IFile, TestGlibCBug) {
uint32_t length = 0;
reader->nextPartition();
uint32_t index = 0;
while(NULL != (key = reader->nextKey(length))) {
while (NULL != (key = reader->nextKey(length))) {
int32_t realKey = (int32_t)bswap(*(uint32_t *)(key));
ASSERT_LT(index, 5);
ASSERT_EQ(expect[index], realKey);
index++;
}
delete reader;
}

View File

@ -23,10 +23,10 @@
#endif
#include <stdexcept>
#include "commons.h"
#include "Buffers.h"
#include "FileSystem.h"
#include "NativeObjectFactory.h"
#include "lib/commons.h"
#include "lib/Buffers.h"
#include "lib/FileSystem.h"
#include "lib/NativeObjectFactory.h"
#include "test_commons.h"
extern "C" {
@ -52,11 +52,11 @@ void handler(int sig) {
}
}
using namespace NativeTask;
typedef char * CString;
int main(int argc, char ** argv) {
namespace NativeTask {
int DoMain(int argc, char** argv) {
signal(SIGSEGV, handler);
CString * newArgv = new CString[argc + 1];
memcpy(newArgv, argv, argc * sizeof(CString));
@ -107,3 +107,10 @@ int main(int argc, char ** argv) {
return 1;
}
}
} // namespace NativeTask
int main(int argc, char ** argv) {
return NativeTask::DoMain(argc, argv);
}

View File

@ -166,7 +166,7 @@ inline char * memchrbrf2(char * p, char ch, size_t len) {
// not safe in MACOSX, segment fault, should be safe on Linux with out mmap
inline int memchr_sse(const char *s, int c, int len) {
//len : edx; c: esi; s:rdi
// len : edx; c: esi; s:rdi
int index = 0;
#ifdef __X64

View File

@ -16,10 +16,10 @@
* limitations under the License.
*/
#include "commons.h"
#include "Streams.h"
#include "Buffers.h"
#include "DualPivotQuickSort.h"
#include "lib/commons.h"
#include "lib/Streams.h"
#include "lib/Buffers.h"
#include "util/DualPivotQuickSort.h"
#include "test_commons.h"
string gBuffer;
@ -82,7 +82,7 @@ static int compare_offset(const void * plh, const void * prh) {
* dualpivot sort compare function
*/
class CompareOffset {
public:
public:
int64_t operator()(uint32_t lhs, uint32_t rhs) {
KVBuffer * lhb = (KVBuffer*)get_position(lhs);
@ -101,7 +101,7 @@ public:
* quicksort compare function
*/
class OffsetLessThan {
public:
public:
bool operator()(uint32_t lhs, uint32_t rhs) {
KVBuffer * lhb = (KVBuffer*)get_position(lhs);
KVBuffer * rhb = (KVBuffer*)get_position(rhs);
@ -132,7 +132,7 @@ static int compare_offset2(const void * plh, const void * prh) {
* dualpivot sort compare function
*/
class CompareOffset2 {
public:
public:
int64_t operator()(uint32_t lhs, uint32_t rhs) {
KVBuffer * lhb = (KVBuffer*)get_position(lhs);
@ -151,7 +151,7 @@ public:
* quicksort compare function
*/
class OffsetLessThan2 {
public:
public:
bool operator()(uint32_t lhs, uint32_t rhs) {
KVBuffer * lhb = (KVBuffer*)get_position(lhs);
@ -163,23 +163,6 @@ public:
}
};
/*
void makeInput(string & dest, vector<uint32_t> & offsets, uint64_t length) {
TeraGen tera = TeraGen(length / 100, 1, 0);
dest.reserve(length + 1024);
string k, v;
while (tera.next(k, v)) {
offsets.push_back(dest.length());
uint32_t tempLen = k.length();
dest.append((const char *)&tempLen, 4);
dest.append(k.data(), k.length());
tempLen = v.length();
dest.append((const char *)&tempLen, 4);
dest.append(v.data(), v.length());
}
}
*/
void makeInputWord(string & dest, vector<uint32_t> & offsets, uint64_t length) {
Random r;
dest.reserve(length + 1024);

Some files were not shown because too many files have changed in this diff Show More