HADOOP-11505. Various native parts use bswap incorrectly and unportably (Alan Burlison via aw)
This commit is contained in:
parent
9f256d1d71
commit
6725e7f1be
@ -537,6 +537,9 @@ Trunk (Unreleased)
|
|||||||
|
|
||||||
HADOOP-12553. [JDK8] Fix javadoc error caused by illegal tag. (aajisaka)
|
HADOOP-12553. [JDK8] Fix javadoc error caused by illegal tag. (aajisaka)
|
||||||
|
|
||||||
|
HADOOP-11505. Various native parts use bswap incorrectly and unportably
|
||||||
|
(Alan Burlison via aw)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-7761. Improve the performance of raw comparisons. (todd)
|
HADOOP-7761. Improve the performance of raw comparisons. (todd)
|
||||||
|
@ -41,6 +41,131 @@ endif()
|
|||||||
# Configure JNI.
|
# Configure JNI.
|
||||||
include(HadoopJNI)
|
include(HadoopJNI)
|
||||||
|
|
||||||
|
#
|
||||||
|
# Endian configuration, as per http://austingroupbugs.net/view.php?id=162#c665
|
||||||
|
#
|
||||||
|
|
||||||
|
# Work out the endianness, set header macro values.
|
||||||
|
include(TestBigEndian)
|
||||||
|
include(CheckIncludeFile)
|
||||||
|
include(CheckSymbolExists)
|
||||||
|
test_big_endian(_bigendian)
|
||||||
|
if(_bigendian)
|
||||||
|
set(HADOOP_BYTE_ORDER "HADOOP_BIG_ENDIAN")
|
||||||
|
else()
|
||||||
|
set(HADOOP_BYTE_ORDER "HADOOP_LITTLE_ENDIAN")
|
||||||
|
endif()
|
||||||
|
|
||||||
|
# Linux, NetBSD, FreeBSD and OpenBSD all provide htoXXX definitions in endian.h or sys/endian.h.
|
||||||
|
check_include_file("endian.h" _endian_h)
|
||||||
|
if (_endian_h)
|
||||||
|
set(HADOOP_ENDIAN_H "endian.h")
|
||||||
|
else()
|
||||||
|
check_include_file("sys/endian.h" _sys_endian_h)
|
||||||
|
if (_sys_endian_h)
|
||||||
|
set(HADOOP_ENDIAN_H "sys/endian.h")
|
||||||
|
endif()
|
||||||
|
endif()
|
||||||
|
if(DEFINED HADOOP_ENDIAN_H)
|
||||||
|
check_symbol_exists("be64toh" ${HADOOP_ENDIAN_H} _be64toh)
|
||||||
|
if( _be64toh)
|
||||||
|
set(HADOOP_HTOBE16 "htobe16")
|
||||||
|
set(HADOOP_HTOLE16 "htole16")
|
||||||
|
set(HADOOP_BE16TOH "be16toh")
|
||||||
|
set(HADOOP_LE16TOH "le16toh")
|
||||||
|
set(HADOOP_HTOBE32 "htobe32")
|
||||||
|
set(HADOOP_HTOLE32 "htole32")
|
||||||
|
set(HADOOP_BE32TOH "be32toh")
|
||||||
|
set(HADOOP_LE32TOH "le32toh")
|
||||||
|
set(HADOOP_HTOBE64 "htobe64")
|
||||||
|
set(HADOOP_HTOLE64 "htole64")
|
||||||
|
set(HADOOP_BE64TOH "be64toh")
|
||||||
|
set(HADOOP_LE64TOH "le64toh")
|
||||||
|
set(_have_endian TRUE)
|
||||||
|
unset(_be64toh)
|
||||||
|
else()
|
||||||
|
message(FATAL_ERROR "endian.h located but doesn't contain be64toh")
|
||||||
|
endif()
|
||||||
|
endif()
|
||||||
|
|
||||||
|
# Solaris doesn't provide htoXXX, we have to provide alternatives.
|
||||||
|
if(NOT _have_endian)
|
||||||
|
check_include_file("sys/byteorder.h" _sys_byteorder_h)
|
||||||
|
if(_sys_byteorder_h)
|
||||||
|
set(HADOOP_ENDIAN_H "sys/byteorder.h")
|
||||||
|
check_symbol_exists("BSWAP_64" ${HADOOP_ENDIAN_H} _bswap_64)
|
||||||
|
endif()
|
||||||
|
if(_sys_byteorder_h AND _bswap_64)
|
||||||
|
if(_bigendian)
|
||||||
|
set(HADOOP_HTOBE16 "")
|
||||||
|
set(HADOOP_HTOLE16 "BSWAP_16")
|
||||||
|
set(HADOOP_BE16TOH "")
|
||||||
|
set(HADOOP_LE16TOH "BSWAP_16")
|
||||||
|
set(HADOOP_HTOBE32 "")
|
||||||
|
set(HADOOP_HTOLE32 "BSWAP_32")
|
||||||
|
set(HADOOP_BE32TOH "")
|
||||||
|
set(HADOOP_LE32TOH "BSWAP_32")
|
||||||
|
set(HADOOP_HTOBE64 "")
|
||||||
|
set(HADOOP_HTOLE64 "BSWAP_64")
|
||||||
|
set(HADOOP_BE64TOH "")
|
||||||
|
set(HADOOP_LE64TOH "BSWAP_64")
|
||||||
|
else()
|
||||||
|
set(HADOOP_HTOBE16 "BSWAP_16")
|
||||||
|
set(HADOOP_HTOLE16 "")
|
||||||
|
set(HADOOP_BE16TOH "BSWAP_16")
|
||||||
|
set(HADOOP_LE16TOH "")
|
||||||
|
set(HADOOP_HTOBE32 "BSWAP_32")
|
||||||
|
set(HADOOP_HTOLE32 "")
|
||||||
|
set(HADOOP_BE32TOH "BSWAP_32")
|
||||||
|
set(HADOOP_LE32TOH "")
|
||||||
|
set(HADOOP_HTOBE64 "BSWAP_64")
|
||||||
|
set(HADOOP_HTOLE64 "")
|
||||||
|
set(HADOOP_BE64TOH "BSWAP_64")
|
||||||
|
set(HADOOP_LE64TOH "")
|
||||||
|
endif()
|
||||||
|
set(_have_endian TRUE)
|
||||||
|
unset(_sys_byteorder_h)
|
||||||
|
unset(_bswap_64)
|
||||||
|
endif()
|
||||||
|
endif()
|
||||||
|
|
||||||
|
# OSX uses libkern/OSByteOrder.h and OSSwapXtoY.
|
||||||
|
if(NOT _have_endian)
|
||||||
|
check_include_file("libkern/OSByteOrder.h" _libkern_osbyteorder_h)
|
||||||
|
if(_libkern_osbyteorder_h)
|
||||||
|
set(HADOOP_ENDIAN_H "libkern/OSByteOrder.h")
|
||||||
|
check_symbol_exists("OSSwapHostToLittleInt64" ${HADOOP_ENDIAN_H} _osswaphosttolittleint64)
|
||||||
|
endif()
|
||||||
|
if(_libkern_osbyteorder_h AND _osswaphosttolittleint64)
|
||||||
|
set(HADOOP_HTOBE16 "OSSwapHostToBigInt16")
|
||||||
|
set(HADOOP_HTOLE16 "OSSwapHostToLittleInt16")
|
||||||
|
set(HADOOP_BE16TOH "OSSwapBigToHostInt16")
|
||||||
|
set(HADOOP_LE16TOH "OSSwapLittleToHostInt16")
|
||||||
|
set(HADOOP_HTOBE32 "OSSwapHostToBigInt32")
|
||||||
|
set(HADOOP_HTOLE32 "OSSwapHostToLittleInt32")
|
||||||
|
set(HADOOP_BE32TOH "OSSwapBigToHostInt32")
|
||||||
|
set(HADOOP_LE32TOH "OSSwapLittleToHostInt32")
|
||||||
|
set(HADOOP_HTOBE64 "OSSwapHostToBigInt64")
|
||||||
|
set(HADOOP_HTOLE64 "OSSwapHostToLittleInt64")
|
||||||
|
set(HADOOP_BE64TOH "OSSwapBigToHostInt64")
|
||||||
|
set(HADOOP_LE64TOH "OSSwapLittleToHostInt64")
|
||||||
|
set(_have_endian TRUE)
|
||||||
|
unset(_libkern_osbyteorder_h)
|
||||||
|
unset(_osswaphosttolittleint64)
|
||||||
|
endif()
|
||||||
|
endif()
|
||||||
|
|
||||||
|
# Bail if we don't know the endian definitions for this platform.
|
||||||
|
if(NOT _have_endian)
|
||||||
|
message(FATAL_ERROR "Can't provide endianness definitions for this platform")
|
||||||
|
endif()
|
||||||
|
|
||||||
|
# Configure the hadoop_endian.h header file.
|
||||||
|
configure_file(${CMAKE_SOURCE_DIR}/hadoop_endian.h.cmake ${CMAKE_BINARY_DIR}/hadoop_endian.h)
|
||||||
|
unset(_bigendian)
|
||||||
|
unset(_have_endian)
|
||||||
|
unset(HADOOP_ENDIAN_H)
|
||||||
|
|
||||||
# Require zlib.
|
# Require zlib.
|
||||||
set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
|
set(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
|
||||||
hadoop_set_find_shared_library_version("1")
|
hadoop_set_find_shared_library_version("1")
|
||||||
|
@ -37,6 +37,7 @@
|
|||||||
#include "crc32c_tables.h"
|
#include "crc32c_tables.h"
|
||||||
#include "bulk_crc32.h"
|
#include "bulk_crc32.h"
|
||||||
#include "gcc_optimizations.h"
|
#include "gcc_optimizations.h"
|
||||||
|
#include "hadoop_endian.h"
|
||||||
|
|
||||||
#define CRC_INITIAL_VAL 0xffffffff
|
#define CRC_INITIAL_VAL 0xffffffff
|
||||||
|
|
||||||
@ -163,7 +164,7 @@ static uint32_t crc32c_sb8(uint32_t crc, const uint8_t *buf, size_t length) {
|
|||||||
for (li=0; li < running_length/8; li++) {
|
for (li=0; li < running_length/8; li++) {
|
||||||
uint32_t term1;
|
uint32_t term1;
|
||||||
uint32_t term2;
|
uint32_t term2;
|
||||||
crc ^= *(uint32_t *)buf;
|
crc ^= hadoop_htole32(*(uint32_t *)buf);
|
||||||
buf += 4;
|
buf += 4;
|
||||||
term1 = CRC32C_T8_7[crc & 0x000000FF] ^
|
term1 = CRC32C_T8_7[crc & 0x000000FF] ^
|
||||||
CRC32C_T8_6[(crc >> 8) & 0x000000FF];
|
CRC32C_T8_6[(crc >> 8) & 0x000000FF];
|
||||||
@ -171,10 +172,10 @@ static uint32_t crc32c_sb8(uint32_t crc, const uint8_t *buf, size_t length) {
|
|||||||
crc = term1 ^
|
crc = term1 ^
|
||||||
CRC32C_T8_5[term2 & 0x000000FF] ^
|
CRC32C_T8_5[term2 & 0x000000FF] ^
|
||||||
CRC32C_T8_4[(term2 >> 8) & 0x000000FF];
|
CRC32C_T8_4[(term2 >> 8) & 0x000000FF];
|
||||||
term1 = CRC32C_T8_3[(*(uint32_t *)buf) & 0x000000FF] ^
|
term1 = CRC32C_T8_3[hadoop_htole32(*(uint32_t *)buf) & 0x000000FF] ^
|
||||||
CRC32C_T8_2[((*(uint32_t *)buf) >> 8) & 0x000000FF];
|
CRC32C_T8_2[(hadoop_htole32(*(uint32_t *)buf) >> 8) & 0x000000FF];
|
||||||
|
|
||||||
term2 = (*(uint32_t *)buf) >> 16;
|
term2 = hadoop_htole32((*(uint32_t *)buf)) >> 16;
|
||||||
crc = crc ^
|
crc = crc ^
|
||||||
term1 ^
|
term1 ^
|
||||||
CRC32C_T8_1[term2 & 0x000000FF] ^
|
CRC32C_T8_1[term2 & 0x000000FF] ^
|
||||||
@ -209,7 +210,7 @@ static uint32_t crc32_zlib_sb8(
|
|||||||
for (li=0; li < running_length/8; li++) {
|
for (li=0; li < running_length/8; li++) {
|
||||||
uint32_t term1;
|
uint32_t term1;
|
||||||
uint32_t term2;
|
uint32_t term2;
|
||||||
crc ^= *(uint32_t *)buf;
|
crc ^= hadoop_htole32(*(uint32_t *)buf);
|
||||||
buf += 4;
|
buf += 4;
|
||||||
term1 = CRC32_T8_7[crc & 0x000000FF] ^
|
term1 = CRC32_T8_7[crc & 0x000000FF] ^
|
||||||
CRC32_T8_6[(crc >> 8) & 0x000000FF];
|
CRC32_T8_6[(crc >> 8) & 0x000000FF];
|
||||||
@ -217,10 +218,10 @@ static uint32_t crc32_zlib_sb8(
|
|||||||
crc = term1 ^
|
crc = term1 ^
|
||||||
CRC32_T8_5[term2 & 0x000000FF] ^
|
CRC32_T8_5[term2 & 0x000000FF] ^
|
||||||
CRC32_T8_4[(term2 >> 8) & 0x000000FF];
|
CRC32_T8_4[(term2 >> 8) & 0x000000FF];
|
||||||
term1 = CRC32_T8_3[(*(uint32_t *)buf) & 0x000000FF] ^
|
term1 = CRC32_T8_3[hadoop_htole32(*(uint32_t *)buf) & 0x000000FF] ^
|
||||||
CRC32_T8_2[((*(uint32_t *)buf) >> 8) & 0x000000FF];
|
CRC32_T8_2[(hadoop_htole32(*(uint32_t *)buf) >> 8) & 0x000000FF];
|
||||||
|
|
||||||
term2 = (*(uint32_t *)buf) >> 16;
|
term2 = hadoop_htole32(*(uint32_t *)buf) >> 16;
|
||||||
crc = crc ^
|
crc = crc ^
|
||||||
term1 ^
|
term1 ^
|
||||||
CRC32_T8_1[term2 & 0x000000FF] ^
|
CRC32_T8_1[term2 & 0x000000FF] ^
|
||||||
|
@ -90,6 +90,7 @@ include_directories(
|
|||||||
${SRC}/src/util
|
${SRC}/src/util
|
||||||
${SRC}/src/lib
|
${SRC}/src/lib
|
||||||
${SRC}/test
|
${SRC}/test
|
||||||
|
../../../../hadoop-common-project/hadoop-common/target/native
|
||||||
${CMAKE_CURRENT_SOURCE_DIR}
|
${CMAKE_CURRENT_SOURCE_DIR}
|
||||||
${CMAKE_BINARY_DIR}
|
${CMAKE_BINARY_DIR}
|
||||||
${JNI_INCLUDE_DIRS}
|
${JNI_INCLUDE_DIRS}
|
||||||
|
@ -42,8 +42,8 @@
|
|||||||
* <code>
|
* <code>
|
||||||
* int HivePlatform::HiveKeyComparator(const char * src, uint32_t srcLength,
|
* int HivePlatform::HiveKeyComparator(const char * src, uint32_t srcLength,
|
||||||
* const char * dest, uint32_t destLength) {
|
* const char * dest, uint32_t destLength) {
|
||||||
* uint32_t sl = bswap(*(uint32_t*)src);
|
* uint32_t sl = hadoop_be32toh(*(uint32_t*)src);
|
||||||
* uint32_t dl = bswap(*(uint32_t*)dest);
|
* uint32_t dl = hadoop_be32toh(*(uint32_t*)dest);
|
||||||
* return NativeObjectFactory::BytesComparator(src + 4, sl, dest + 4, dl);
|
* return NativeObjectFactory::BytesComparator(src + 4, sl, dest + 4, dl);
|
||||||
* }
|
* }
|
||||||
* </code>
|
* </code>
|
||||||
|
@ -40,15 +40,6 @@ enum NativeObjectType {
|
|||||||
BatchHandlerType = 1,
|
BatchHandlerType = 1,
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
|
||||||
* Enduim setting
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
enum Endium {
|
|
||||||
LITTLE_ENDIUM = 0,
|
|
||||||
LARGE_ENDIUM = 1
|
|
||||||
};
|
|
||||||
|
|
||||||
#define NATIVE_COMBINER "native.combiner.class"
|
#define NATIVE_COMBINER "native.combiner.class"
|
||||||
#define NATIVE_PARTITIONER "native.partitioner.class"
|
#define NATIVE_PARTITIONER "native.partitioner.class"
|
||||||
#define NATIVE_MAPPER "native.mapper.class"
|
#define NATIVE_MAPPER "native.mapper.class"
|
||||||
|
@ -104,8 +104,8 @@ int32_t BlockDecompressStream::read(void * buff, uint32_t length) {
|
|||||||
THROW_EXCEPTION(IOException, "readFully get incomplete data");
|
THROW_EXCEPTION(IOException, "readFully get incomplete data");
|
||||||
}
|
}
|
||||||
_compressedBytesRead += rd;
|
_compressedBytesRead += rd;
|
||||||
sizes[0] = bswap(sizes[0]);
|
sizes[0] = hadoop_be32toh(sizes[0]);
|
||||||
sizes[1] = bswap(sizes[1]);
|
sizes[1] = hadoop_be32toh(sizes[1]);
|
||||||
if (sizes[0] <= length) {
|
if (sizes[0] <= length) {
|
||||||
uint32_t len = decompressOneBlock(sizes[1], buff, sizes[0]);
|
uint32_t len = decompressOneBlock(sizes[1], buff, sizes[0]);
|
||||||
if (len != sizes[0]) {
|
if (len != sizes[0]) {
|
||||||
|
@ -38,8 +38,8 @@ void Lz4CompressStream::compressOneBlock(const void * buff, uint32_t length) {
|
|||||||
int ret = LZ4_compress((char*)buff, _tempBuffer + 8, length);
|
int ret = LZ4_compress((char*)buff, _tempBuffer + 8, length);
|
||||||
if (ret > 0) {
|
if (ret > 0) {
|
||||||
compressedLength = ret;
|
compressedLength = ret;
|
||||||
((uint32_t*)_tempBuffer)[0] = bswap(length);
|
((uint32_t*)_tempBuffer)[0] = hadoop_be32toh(length);
|
||||||
((uint32_t*)_tempBuffer)[1] = bswap((uint32_t)compressedLength);
|
((uint32_t*)_tempBuffer)[1] = hadoop_be32toh((uint32_t)compressedLength);
|
||||||
_stream->write(_tempBuffer, compressedLength + 8);
|
_stream->write(_tempBuffer, compressedLength + 8);
|
||||||
_compressedBytesWritten += (compressedLength + 8);
|
_compressedBytesWritten += (compressedLength + 8);
|
||||||
} else {
|
} else {
|
||||||
|
@ -37,8 +37,8 @@ void SnappyCompressStream::compressOneBlock(const void * buff, uint32_t length)
|
|||||||
snappy_status ret = snappy_compress((const char*)buff, length, _tempBuffer + 8,
|
snappy_status ret = snappy_compress((const char*)buff, length, _tempBuffer + 8,
|
||||||
&compressedLength);
|
&compressedLength);
|
||||||
if (ret == SNAPPY_OK) {
|
if (ret == SNAPPY_OK) {
|
||||||
((uint32_t*)_tempBuffer)[0] = bswap(length);
|
((uint32_t*)_tempBuffer)[0] = hadoop_be32toh(length);
|
||||||
((uint32_t*)_tempBuffer)[1] = bswap((uint32_t)compressedLength);
|
((uint32_t*)_tempBuffer)[1] = hadoop_be32toh((uint32_t)compressedLength);
|
||||||
_stream->write(_tempBuffer, compressedLength + 8);
|
_stream->write(_tempBuffer, compressedLength + 8);
|
||||||
_compressedBytesWritten += (compressedLength + 8);
|
_compressedBytesWritten += (compressedLength + 8);
|
||||||
} else if (ret == SNAPPY_INVALID_INPUT) {
|
} else if (ret == SNAPPY_INVALID_INPUT) {
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "CombineHandler.h"
|
#include "CombineHandler.h"
|
||||||
|
|
||||||
namespace NativeTask {
|
namespace NativeTask {
|
||||||
@ -48,8 +49,8 @@ uint32_t CombineHandler::feedDataToJavaInWritableSerialization() {
|
|||||||
|
|
||||||
if (_kvCached) {
|
if (_kvCached) {
|
||||||
uint32_t kvLength = _key.outerLength + _value.outerLength + KVBuffer::headerLength();
|
uint32_t kvLength = _key.outerLength + _value.outerLength + KVBuffer::headerLength();
|
||||||
outputInt(bswap(_key.outerLength));
|
outputInt(hadoop_be32toh(_key.outerLength));
|
||||||
outputInt(bswap(_value.outerLength));
|
outputInt(hadoop_be32toh(_value.outerLength));
|
||||||
outputKeyOrValue(_key, _kType);
|
outputKeyOrValue(_key, _kType);
|
||||||
outputKeyOrValue(_value, _vType);
|
outputKeyOrValue(_value, _vType);
|
||||||
|
|
||||||
@ -73,8 +74,8 @@ uint32_t CombineHandler::feedDataToJavaInWritableSerialization() {
|
|||||||
} else {
|
} else {
|
||||||
firstKV = false;
|
firstKV = false;
|
||||||
//write final key length and final value length
|
//write final key length and final value length
|
||||||
outputInt(bswap(_key.outerLength));
|
outputInt(hadoop_be32toh(_key.outerLength));
|
||||||
outputInt(bswap(_value.outerLength));
|
outputInt(hadoop_be32toh(_value.outerLength));
|
||||||
outputKeyOrValue(_key, _kType);
|
outputKeyOrValue(_key, _kType);
|
||||||
outputKeyOrValue(_value, _vType);
|
outputKeyOrValue(_value, _vType);
|
||||||
|
|
||||||
@ -101,7 +102,7 @@ void CombineHandler::outputKeyOrValue(SerializeInfo & KV, KeyValueType type) {
|
|||||||
output(KV.buffer.data(), KV.buffer.length());
|
output(KV.buffer.data(), KV.buffer.length());
|
||||||
break;
|
break;
|
||||||
case BytesType:
|
case BytesType:
|
||||||
outputInt(bswap(KV.buffer.length()));
|
outputInt(hadoop_be32toh(KV.buffer.length()));
|
||||||
output(KV.buffer.data(), KV.buffer.length());
|
output(KV.buffer.data(), KV.buffer.length());
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
@ -202,8 +203,8 @@ void CombineHandler::write(char * buf, uint32_t length) {
|
|||||||
uint32_t outputRecordCount = 0;
|
uint32_t outputRecordCount = 0;
|
||||||
while (remain > 0) {
|
while (remain > 0) {
|
||||||
kv = (KVBuffer *)pos;
|
kv = (KVBuffer *)pos;
|
||||||
kv->keyLength = bswap(kv->keyLength);
|
kv->keyLength = hadoop_be32toh(kv->keyLength);
|
||||||
kv->valueLength = bswap(kv->valueLength);
|
kv->valueLength = hadoop_be32toh(kv->valueLength);
|
||||||
_writer->write(kv->getKey(), kv->keyLength, kv->getValue(), kv->valueLength);
|
_writer->write(kv->getKey(), kv->keyLength, kv->getValue(), kv->valueLength);
|
||||||
outputRecordCount++;
|
outputRecordCount++;
|
||||||
remain -= kv->length();
|
remain -= kv->length();
|
||||||
|
@ -30,7 +30,7 @@ using std::vector;
|
|||||||
namespace NativeTask {
|
namespace NativeTask {
|
||||||
|
|
||||||
MCollectorOutputHandler::MCollectorOutputHandler()
|
MCollectorOutputHandler::MCollectorOutputHandler()
|
||||||
: _collector(NULL), _dest(NULL), _endium(LARGE_ENDIUM) {
|
: _collector(NULL), _dest(NULL) {
|
||||||
}
|
}
|
||||||
|
|
||||||
MCollectorOutputHandler::~MCollectorOutputHandler() {
|
MCollectorOutputHandler::~MCollectorOutputHandler() {
|
||||||
@ -73,11 +73,9 @@ void MCollectorOutputHandler::handleInput(ByteBuffer & in) {
|
|||||||
THROW_EXCEPTION(IOException, "k/v meta information incomplete");
|
THROW_EXCEPTION(IOException, "k/v meta information incomplete");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_endium == LARGE_ENDIUM) {
|
kvBuffer->partitionId = hadoop_be32toh(kvBuffer->partitionId);
|
||||||
kvBuffer->partitionId = bswap(kvBuffer->partitionId);
|
kvBuffer->buffer.keyLength = hadoop_be32toh(kvBuffer->buffer.keyLength);
|
||||||
kvBuffer->buffer.keyLength = bswap(kvBuffer->buffer.keyLength);
|
kvBuffer->buffer.valueLength = hadoop_be32toh(kvBuffer->buffer.valueLength);
|
||||||
kvBuffer->buffer.valueLength = bswap(kvBuffer->buffer.valueLength);
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t kvLength = kvBuffer->buffer.length();
|
uint32_t kvLength = kvBuffer->buffer.length();
|
||||||
|
|
||||||
|
@ -35,8 +35,6 @@ private:
|
|||||||
// state info for large KV pairs
|
// state info for large KV pairs
|
||||||
char * _dest;
|
char * _dest;
|
||||||
|
|
||||||
Endium _endium;
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
MCollectorOutputHandler();
|
MCollectorOutputHandler();
|
||||||
virtual ~MCollectorOutputHandler();
|
virtual ~MCollectorOutputHandler();
|
||||||
|
@ -115,7 +115,7 @@ public:
|
|||||||
* read uint32_t big endian
|
* read uint32_t big endian
|
||||||
*/
|
*/
|
||||||
inline uint32_t read_uint32_be() {
|
inline uint32_t read_uint32_be() {
|
||||||
return bswap(read_uint32_le());
|
return hadoop_be32toh(read_uint32_le());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -198,7 +198,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
inline void write_uint32_be(uint32_t v) {
|
inline void write_uint32_be(uint32_t v) {
|
||||||
write_uint32_le(bswap(v));
|
write_uint32_le(hadoop_be32toh(v));
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void write_uint64_le(uint64_t v) {
|
inline void write_uint64_le(uint64_t v) {
|
||||||
@ -211,7 +211,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
inline void write_uint64_be(uint64_t v) {
|
inline void write_uint64_be(uint64_t v) {
|
||||||
write_uint64_le(bswap64(v));
|
write_uint64_le(hadoop_be64toh(v));
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void write_vlong(int64_t v) {
|
inline void write_vlong(int64_t v) {
|
||||||
@ -278,12 +278,11 @@ struct KVBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
uint32_t length() {
|
uint32_t length() {
|
||||||
return keyLength + valueLength + SIZE_OF_KEY_LENGTH + SIZE_OF_VALUE_LENGTH;
|
return keyLength + valueLength + SIZE_OF_KV_LENGTH;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t lengthConvertEndium() {
|
uint32_t lengthConvertEndium() {
|
||||||
long value = bswap64(*((long *)this));
|
return hadoop_be32toh(keyLength) + hadoop_be32toh(valueLength) + SIZE_OF_KV_LENGTH;
|
||||||
return (value >> 32) + value + SIZE_OF_KEY_LENGTH + SIZE_OF_VALUE_LENGTH;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void fill(const void * key, uint32_t keylen, const void * value, uint32_t vallen) {
|
void fill(const void * key, uint32_t keylen, const void * value, uint32_t vallen) {
|
||||||
@ -299,7 +298,7 @@ struct KVBuffer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static uint32_t headerLength() {
|
static uint32_t headerLength() {
|
||||||
return SIZE_OF_KEY_LENGTH + SIZE_OF_VALUE_LENGTH;
|
return SIZE_OF_KV_LENGTH;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ bool IFileReader::nextPartition() {
|
|||||||
if (4 != _stream->readFully(&chsum, 4)) {
|
if (4 != _stream->readFully(&chsum, 4)) {
|
||||||
THROW_EXCEPTION(IOException, "read ifile checksum failed");
|
THROW_EXCEPTION(IOException, "read ifile checksum failed");
|
||||||
}
|
}
|
||||||
uint32_t actual = bswap(chsum);
|
uint32_t actual = hadoop_be32toh(chsum);
|
||||||
uint32_t expect = _source->getChecksum();
|
uint32_t expect = _source->getChecksum();
|
||||||
if (actual != expect) {
|
if (actual != expect) {
|
||||||
THROW_EXCEPTION_EX(IOException, "read ifile checksum not match, actual %x expect %x", actual,
|
THROW_EXCEPTION_EX(IOException, "read ifile checksum not match, actual %x expect %x", actual,
|
||||||
@ -130,7 +130,7 @@ void IFileWriter::endPartition() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
uint32_t chsum = _dest->getChecksum();
|
uint32_t chsum = _dest->getChecksum();
|
||||||
chsum = bswap(chsum);
|
chsum = hadoop_be32toh(chsum);
|
||||||
_stream->write(&chsum, sizeof(chsum));
|
_stream->write(&chsum, sizeof(chsum));
|
||||||
_stream->flush();
|
_stream->flush();
|
||||||
IFileSegment * info = &(_spillFileSegments[_spillFileSegments.size() - 1]);
|
IFileSegment * info = &(_spillFileSegments[_spillFileSegments.size() - 1]);
|
||||||
|
@ -74,7 +74,7 @@ public:
|
|||||||
keyLen = WritableUtils::ReadVInt(kvbuff, len);
|
keyLen = WritableUtils::ReadVInt(kvbuff, len);
|
||||||
break;
|
break;
|
||||||
case BytesType:
|
case BytesType:
|
||||||
keyLen = bswap(*(uint32_t*)kvbuff);
|
keyLen = hadoop_be32toh(*(uint32_t*)kvbuff);
|
||||||
len = 4;
|
len = 4;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
@ -89,7 +89,7 @@ public:
|
|||||||
_valuePos = vbuff + len;
|
_valuePos = vbuff + len;
|
||||||
break;
|
break;
|
||||||
case BytesType:
|
case BytesType:
|
||||||
_valueLen = bswap(*(uint32_t*)vbuff);
|
_valueLen = hadoop_be32toh(*(uint32_t*)vbuff);
|
||||||
_valuePos = vbuff + 4;
|
_valuePos = vbuff + 4;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
@ -317,8 +317,8 @@ int NativeObjectFactory::IntComparator(const char * src, uint32_t srcLength, con
|
|||||||
uint32_t destLength) {
|
uint32_t destLength) {
|
||||||
int result = (*src) - (*dest);
|
int result = (*src) - (*dest);
|
||||||
if (result == 0) {
|
if (result == 0) {
|
||||||
uint32_t from = bswap(*(uint32_t*)src);
|
uint32_t from = hadoop_be32toh(*(uint32_t*)src);
|
||||||
uint32_t to = bswap(*(uint32_t*)dest);
|
uint32_t to = hadoop_be32toh(*(uint32_t*)dest);
|
||||||
if (from > to) {
|
if (from > to) {
|
||||||
return 1;
|
return 1;
|
||||||
} else if (from == to) {
|
} else if (from == to) {
|
||||||
@ -335,8 +335,8 @@ int NativeObjectFactory::LongComparator(const char * src, uint32_t srcLength, co
|
|||||||
int result = (int)(*src) - (int)(*dest);
|
int result = (int)(*src) - (int)(*dest);
|
||||||
if (result == 0) {
|
if (result == 0) {
|
||||||
|
|
||||||
uint64_t from = bswap64(*(uint64_t*)src);
|
uint64_t from = hadoop_be64toh(*(uint64_t*)src);
|
||||||
uint64_t to = bswap64(*(uint64_t*)dest);
|
uint64_t to = hadoop_be64toh(*(uint64_t*)dest);
|
||||||
if (from > to) {
|
if (from > to) {
|
||||||
return 1;
|
return 1;
|
||||||
} else if (from == to) {
|
} else if (from == to) {
|
||||||
@ -380,8 +380,8 @@ int NativeObjectFactory::FloatComparator(const char * src, uint32_t srcLength, c
|
|||||||
THROW_EXCEPTION_EX(IOException, "float comparator, while src/dest lengt is not 4");
|
THROW_EXCEPTION_EX(IOException, "float comparator, while src/dest lengt is not 4");
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t from = bswap(*(uint32_t*)src);
|
uint32_t from = hadoop_be32toh(*(uint32_t*)src);
|
||||||
uint32_t to = bswap(*(uint32_t*)dest);
|
uint32_t to = hadoop_be32toh(*(uint32_t*)dest);
|
||||||
|
|
||||||
float * srcValue = (float *)(&from);
|
float * srcValue = (float *)(&from);
|
||||||
float * destValue = (float *)(&to);
|
float * destValue = (float *)(&to);
|
||||||
@ -401,8 +401,8 @@ int NativeObjectFactory::DoubleComparator(const char * src, uint32_t srcLength,
|
|||||||
THROW_EXCEPTION_EX(IOException, "double comparator, while src/dest lengt is not 4");
|
THROW_EXCEPTION_EX(IOException, "double comparator, while src/dest lengt is not 4");
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t from = bswap64(*(uint64_t*)src);
|
uint64_t from = hadoop_be64toh(*(uint64_t*)src);
|
||||||
uint64_t to = bswap64(*(uint64_t*)dest);
|
uint64_t to = hadoop_be64toh(*(uint64_t*)dest);
|
||||||
|
|
||||||
double * srcValue = (double *)(&from);
|
double * srcValue = (double *)(&from);
|
||||||
double * destValue = (double *)(&to);
|
double * destValue = (double *)(&to);
|
||||||
|
@ -58,10 +58,10 @@ void SingleSpillInfo::writeSpillInfo(const std::string & filepath) {
|
|||||||
appendBuffer.flush();
|
appendBuffer.flush();
|
||||||
uint32_t chsum = dest.getChecksum();
|
uint32_t chsum = dest.getChecksum();
|
||||||
#ifdef SPILLRECORD_CHECKSUM_UINT
|
#ifdef SPILLRECORD_CHECKSUM_UINT
|
||||||
chsum = bswap(chsum);
|
chsum = hadoop_be32toh(chsum);
|
||||||
fout->write(&chsum, sizeof(uint32_t));
|
fout->write(&chsum, sizeof(uint32_t));
|
||||||
#else
|
#else
|
||||||
uint64_t wtchsum = bswap64((uint64_t)chsum);
|
uint64_t wtchsum = hadoop_be64toh((uint64_t)chsum);
|
||||||
fout->write(&wtchsum, sizeof(uint64_t));
|
fout->write(&wtchsum, sizeof(uint64_t));
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
@ -41,6 +41,7 @@
|
|||||||
#include <map>
|
#include <map>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
|
||||||
|
#include "hadoop_endian.h"
|
||||||
#include "lib/primitives.h"
|
#include "lib/primitives.h"
|
||||||
#include "lib/Log.h"
|
#include "lib/Log.h"
|
||||||
#include "NativeTask.h"
|
#include "NativeTask.h"
|
||||||
|
@ -28,6 +28,7 @@
|
|||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include "hadoop_endian.h"
|
||||||
|
|
||||||
#ifdef __GNUC__
|
#ifdef __GNUC__
|
||||||
#define likely(x) __builtin_expect((x),1)
|
#define likely(x) __builtin_expect((x),1)
|
||||||
@ -93,39 +94,6 @@ inline void simple_memcpy(void * dest, const void * src, size_t len) {
|
|||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/**
|
|
||||||
* little-endian to big-endian or vice versa
|
|
||||||
*/
|
|
||||||
inline uint32_t bswap(uint32_t val) {
|
|
||||||
#ifdef __aarch64__
|
|
||||||
__asm__("rev %w[dst], %w[src]" : [dst]"=r"(val) : [src]"r"(val));
|
|
||||||
#else
|
|
||||||
__asm__("bswap %0" : "=r" (val) : "0" (val));
|
|
||||||
#endif
|
|
||||||
return val;
|
|
||||||
}
|
|
||||||
|
|
||||||
inline uint64_t bswap64(uint64_t val) {
|
|
||||||
#ifdef __aarch64__
|
|
||||||
__asm__("rev %[dst], %[src]" : [dst]"=r"(val) : [src]"r"(val));
|
|
||||||
#else
|
|
||||||
#ifdef __X64
|
|
||||||
__asm__("bswapq %0" : "=r" (val) : "0" (val));
|
|
||||||
#else
|
|
||||||
|
|
||||||
uint64_t lower = val & 0xffffffffU;
|
|
||||||
uint32_t higher = (val >> 32) & 0xffffffffU;
|
|
||||||
|
|
||||||
lower = bswap(lower);
|
|
||||||
higher = bswap(higher);
|
|
||||||
|
|
||||||
return (lower << 32) + higher;
|
|
||||||
|
|
||||||
#endif
|
|
||||||
#endif
|
|
||||||
return val;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fast memcmp
|
* Fast memcmp
|
||||||
*/
|
*/
|
||||||
@ -158,16 +126,16 @@ inline int64_t fmemcmp(const char * src, const char * dest, uint32_t len) {
|
|||||||
return ((int64_t)src8[2] - (int64_t)dest8[2]);
|
return ((int64_t)src8[2] - (int64_t)dest8[2]);
|
||||||
}
|
}
|
||||||
case 4: {
|
case 4: {
|
||||||
return (int64_t)bswap(*(uint32_t*)src) - (int64_t)bswap(*(uint32_t*)dest);
|
return (int64_t)hadoop_be32toh(*(uint32_t*)src) - (int64_t)hadoop_be32toh(*(uint32_t*)dest);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (len < 8) {
|
if (len < 8) {
|
||||||
int64_t ret = ((int64_t)bswap(*(uint32_t*)src) - (int64_t)bswap(*(uint32_t*)dest));
|
int64_t ret = ((int64_t)hadoop_be32toh(*(uint32_t*)src) - (int64_t)hadoop_be32toh(*(uint32_t*)dest));
|
||||||
if (ret) {
|
if (ret) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
return ((int64_t)bswap(*(uint32_t*)(src + len - 4))
|
return ((int64_t)hadoop_be32toh(*(uint32_t*)(src + len - 4))
|
||||||
- (int64_t)bswap(*(uint32_t*)(dest + len - 4)));
|
- (int64_t)hadoop_be32toh(*(uint32_t*)(dest + len - 4)));
|
||||||
}
|
}
|
||||||
uint32_t cur = 0;
|
uint32_t cur = 0;
|
||||||
uint32_t end = len & (0xffffffffU << 3);
|
uint32_t end = len & (0xffffffffU << 3);
|
||||||
@ -175,8 +143,8 @@ inline int64_t fmemcmp(const char * src, const char * dest, uint32_t len) {
|
|||||||
uint64_t l = *(uint64_t*)(src8 + cur);
|
uint64_t l = *(uint64_t*)(src8 + cur);
|
||||||
uint64_t r = *(uint64_t*)(dest8 + cur);
|
uint64_t r = *(uint64_t*)(dest8 + cur);
|
||||||
if (l != r) {
|
if (l != r) {
|
||||||
l = bswap64(l);
|
l = hadoop_be64toh(l);
|
||||||
r = bswap64(r);
|
r = hadoop_be64toh(r);
|
||||||
return l > r ? 1 : -1;
|
return l > r ? 1 : -1;
|
||||||
}
|
}
|
||||||
cur += 8;
|
cur += 8;
|
||||||
@ -184,8 +152,8 @@ inline int64_t fmemcmp(const char * src, const char * dest, uint32_t len) {
|
|||||||
uint64_t l = *(uint64_t*)(src8 + len - 8);
|
uint64_t l = *(uint64_t*)(src8 + len - 8);
|
||||||
uint64_t r = *(uint64_t*)(dest8 + len - 8);
|
uint64_t r = *(uint64_t*)(dest8 + len - 8);
|
||||||
if (l != r) {
|
if (l != r) {
|
||||||
l = bswap64(l);
|
l = hadoop_be64toh(l);
|
||||||
r = bswap64(r);
|
r = hadoop_be64toh(r);
|
||||||
return l > r ? 1 : -1;
|
return l > r ? 1 : -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -120,29 +120,29 @@ void WritableUtils::WriteVLongInner(int64_t v, char * pos, uint32_t & len) {
|
|||||||
len = 4;
|
len = 4;
|
||||||
} else if (value < (1ULL << 32)) {
|
} else if (value < (1ULL << 32)) {
|
||||||
*(pos++) = base - 3;
|
*(pos++) = base - 3;
|
||||||
*(uint32_t*)(pos) = bswap((uint32_t)value);
|
*(uint32_t*)(pos) = hadoop_be32toh((uint32_t)value);
|
||||||
len = 5;
|
len = 5;
|
||||||
} else if (value < (1ULL << 40)) {
|
} else if (value < (1ULL << 40)) {
|
||||||
*(pos++) = base - 4;
|
*(pos++) = base - 4;
|
||||||
*(uint32_t*)(pos) = bswap((uint32_t)(value >> 8));
|
*(uint32_t*)(pos) = hadoop_be32toh((uint32_t)(value >> 8));
|
||||||
*(uint8_t*)(pos + 4) = value;
|
*(uint8_t*)(pos + 4) = value;
|
||||||
len = 6;
|
len = 6;
|
||||||
} else if (value < (1ULL << 48)) {
|
} else if (value < (1ULL << 48)) {
|
||||||
*(pos++) = base - 5;
|
*(pos++) = base - 5;
|
||||||
*(uint32_t*)(pos) = bswap((uint32_t)(value >> 16));
|
*(uint32_t*)(pos) = hadoop_be32toh((uint32_t)(value >> 16));
|
||||||
*(uint8_t*)(pos + 4) = value >> 8;
|
*(uint8_t*)(pos + 4) = value >> 8;
|
||||||
*(uint8_t*)(pos + 5) = value;
|
*(uint8_t*)(pos + 5) = value;
|
||||||
len = 7;
|
len = 7;
|
||||||
} else if (value < (1ULL << 56)) {
|
} else if (value < (1ULL << 56)) {
|
||||||
*(pos++) = base - 6;
|
*(pos++) = base - 6;
|
||||||
*(uint32_t*)(pos) = bswap((uint32_t)(value >> 24));
|
*(uint32_t*)(pos) = hadoop_be32toh((uint32_t)(value >> 24));
|
||||||
*(uint8_t*)(pos + 4) = value >> 16;
|
*(uint8_t*)(pos + 4) = value >> 16;
|
||||||
*(uint8_t*)(pos + 5) = value >> 8;
|
*(uint8_t*)(pos + 5) = value >> 8;
|
||||||
*(uint8_t*)(pos + 6) = value;
|
*(uint8_t*)(pos + 6) = value;
|
||||||
len = 8;
|
len = 8;
|
||||||
} else {
|
} else {
|
||||||
*(pos++) = base - 7;
|
*(pos++) = base - 7;
|
||||||
*(uint64_t*)pos = bswap64(value);
|
*(uint64_t*)pos = hadoop_be64toh(value);
|
||||||
len = 9;
|
len = 9;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -168,7 +168,7 @@ int64_t WritableUtils::ReadLong(InputStream * stream) {
|
|||||||
if (stream->readFully(&ret, 8) != 8) {
|
if (stream->readFully(&ret, 8) != 8) {
|
||||||
THROW_EXCEPTION(IOException, "ReadLong reach EOF");
|
THROW_EXCEPTION(IOException, "ReadLong reach EOF");
|
||||||
}
|
}
|
||||||
return (int64_t)bswap64(ret);
|
return (int64_t)hadoop_be64toh(ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t WritableUtils::ReadInt(InputStream * stream) {
|
int32_t WritableUtils::ReadInt(InputStream * stream) {
|
||||||
@ -176,7 +176,7 @@ int32_t WritableUtils::ReadInt(InputStream * stream) {
|
|||||||
if (stream->readFully(&ret, 4) != 4) {
|
if (stream->readFully(&ret, 4) != 4) {
|
||||||
THROW_EXCEPTION(IOException, "ReadInt reach EOF");
|
THROW_EXCEPTION(IOException, "ReadInt reach EOF");
|
||||||
}
|
}
|
||||||
return (int32_t)bswap(ret);
|
return (int32_t)hadoop_be32toh(ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
int16_t WritableUtils::ReadShort(InputStream * stream) {
|
int16_t WritableUtils::ReadShort(InputStream * stream) {
|
||||||
@ -192,7 +192,7 @@ float WritableUtils::ReadFloat(InputStream * stream) {
|
|||||||
if (stream->readFully(&ret, 4) != 4) {
|
if (stream->readFully(&ret, 4) != 4) {
|
||||||
THROW_EXCEPTION(IOException, "ReadFloat reach EOF");
|
THROW_EXCEPTION(IOException, "ReadFloat reach EOF");
|
||||||
}
|
}
|
||||||
ret = bswap(ret);
|
ret = hadoop_be32toh(ret);
|
||||||
return *(float*)&ret;
|
return *(float*)&ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -232,12 +232,12 @@ void WritableUtils::WriteVLong(OutputStream * stream, int64_t v) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void WritableUtils::WriteLong(OutputStream * stream, int64_t v) {
|
void WritableUtils::WriteLong(OutputStream * stream, int64_t v) {
|
||||||
uint64_t be = bswap64((uint64_t)v);
|
uint64_t be = hadoop_be64toh((uint64_t)v);
|
||||||
stream->write(&be, 8);
|
stream->write(&be, 8);
|
||||||
}
|
}
|
||||||
|
|
||||||
void WritableUtils::WriteInt(OutputStream * stream, int32_t v) {
|
void WritableUtils::WriteInt(OutputStream * stream, int32_t v) {
|
||||||
uint32_t be = bswap((uint32_t)v);
|
uint32_t be = hadoop_be32toh((uint32_t)v);
|
||||||
stream->write(&be, 4);
|
stream->write(&be, 4);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -249,7 +249,7 @@ void WritableUtils::WriteShort(OutputStream * stream, int16_t v) {
|
|||||||
|
|
||||||
void WritableUtils::WriteFloat(OutputStream * stream, float v) {
|
void WritableUtils::WriteFloat(OutputStream * stream, float v) {
|
||||||
uint32_t intv = *(uint32_t*)&v;
|
uint32_t intv = *(uint32_t*)&v;
|
||||||
intv = bswap(intv);
|
intv = hadoop_be32toh(intv);
|
||||||
stream->write(&intv, 4);
|
stream->write(&intv, 4);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -286,10 +286,10 @@ void WritableUtils::toString(string & dest, KeyValueType type, const void * data
|
|||||||
dest.append(*(uint8_t*)data ? "true" : "false");
|
dest.append(*(uint8_t*)data ? "true" : "false");
|
||||||
break;
|
break;
|
||||||
case IntType:
|
case IntType:
|
||||||
dest.append(StringUtil::ToString((int32_t)bswap(*(uint32_t*)data)));
|
dest.append(StringUtil::ToString((int32_t)hadoop_be32toh(*(uint32_t*)data)));
|
||||||
break;
|
break;
|
||||||
case LongType:
|
case LongType:
|
||||||
dest.append(StringUtil::ToString((int64_t)bswap64(*(uint64_t*)data)));
|
dest.append(StringUtil::ToString((int64_t)hadoop_be64toh(*(uint64_t*)data)));
|
||||||
break;
|
break;
|
||||||
case FloatType:
|
case FloatType:
|
||||||
dest.append(StringUtil::ToString(*(float*)data));
|
dest.append(StringUtil::ToString(*(float*)data));
|
||||||
|
@ -190,7 +190,7 @@ TEST(IFile, TestGlibCBug) {
|
|||||||
reader->nextPartition();
|
reader->nextPartition();
|
||||||
uint32_t index = 0;
|
uint32_t index = 0;
|
||||||
while (NULL != (key = reader->nextKey(length))) {
|
while (NULL != (key = reader->nextKey(length))) {
|
||||||
int32_t realKey = (int32_t)bswap(*(uint32_t *)(key));
|
int32_t realKey = (int32_t)hadoop_be32toh(*(uint32_t *)(key));
|
||||||
ASSERT_LT(index, 5);
|
ASSERT_LT(index, 5);
|
||||||
ASSERT_EQ(expect[index], realKey);
|
ASSERT_EQ(expect[index], realKey);
|
||||||
index++;
|
index++;
|
||||||
|
@ -38,8 +38,8 @@ inline int fmemcmporig(const char * src, const char * dest, uint32_t len) {
|
|||||||
uint64_t l = *src8;
|
uint64_t l = *src8;
|
||||||
uint64_t r = *dest8;
|
uint64_t r = *dest8;
|
||||||
if (l != r) {
|
if (l != r) {
|
||||||
l = bswap64(l);
|
l = hadoop_be64toh(l);
|
||||||
r = bswap64(r);
|
r = hadoop_be64toh(r);
|
||||||
return l > r ? 1 : -1;
|
return l > r ? 1 : -1;
|
||||||
}
|
}
|
||||||
++src8;
|
++src8;
|
||||||
@ -59,8 +59,8 @@ inline int fmemcmporig(const char * src, const char * dest, uint32_t len) {
|
|||||||
if (l == r) {
|
if (l == r) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
l = bswap64(l);
|
l = hadoop_be64toh(l);
|
||||||
r = bswap64(r);
|
r = hadoop_be64toh(r);
|
||||||
return l > r ? 1 : -1;
|
return l > r ? 1 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,8 +43,8 @@ TEST(KVBuffer, test) {
|
|||||||
ASSERT_EQ(8, kv1->getKey() - buff);
|
ASSERT_EQ(8, kv1->getKey() - buff);
|
||||||
ASSERT_EQ(strlen(KEY) + 8, kv1->getValue() - buff);
|
ASSERT_EQ(strlen(KEY) + 8, kv1->getValue() - buff);
|
||||||
|
|
||||||
kv1->keyLength = bswap(kv1->keyLength);
|
kv1->keyLength = hadoop_be32toh(kv1->keyLength);
|
||||||
kv1->valueLength = bswap(kv1->valueLength);
|
kv1->valueLength = hadoop_be32toh(kv1->valueLength);
|
||||||
|
|
||||||
ASSERT_EQ(8, kv1->headerLength());
|
ASSERT_EQ(8, kv1->headerLength());
|
||||||
ASSERT_EQ(strlen(KEY) + strlen(VALUE) + 8, kv1->lengthConvertEndium());
|
ASSERT_EQ(strlen(KEY) + strlen(VALUE) + 8, kv1->lengthConvertEndium());
|
||||||
|
@ -59,7 +59,7 @@ class MemoryBlockFactory {
|
|||||||
kv->keyLength = 4;
|
kv->keyLength = 4;
|
||||||
kv->valueLength = 4;
|
kv->valueLength = 4;
|
||||||
uint32_t * key = (uint32_t *)kv->getKey();
|
uint32_t * key = (uint32_t *)kv->getKey();
|
||||||
*key = bswap(index);
|
*key = hadoop_be32toh(index);
|
||||||
}
|
}
|
||||||
return block1;
|
return block1;
|
||||||
}
|
}
|
||||||
|
@ -85,17 +85,17 @@ TEST(MemoryBlock, sort) {
|
|||||||
medium->keyLength = 4;
|
medium->keyLength = 4;
|
||||||
medium->valueLength = 4;
|
medium->valueLength = 4;
|
||||||
uint32_t * mediumKey = (uint32_t *)medium->getKey();
|
uint32_t * mediumKey = (uint32_t *)medium->getKey();
|
||||||
*mediumKey = bswap(MEDIUM);
|
*mediumKey = hadoop_be32toh(MEDIUM);
|
||||||
|
|
||||||
small->keyLength = 4;
|
small->keyLength = 4;
|
||||||
small->valueLength = 4;
|
small->valueLength = 4;
|
||||||
uint32_t * smallKey = (uint32_t *)small->getKey();
|
uint32_t * smallKey = (uint32_t *)small->getKey();
|
||||||
*smallKey = bswap(SMALL);
|
*smallKey = hadoop_be32toh(SMALL);
|
||||||
|
|
||||||
big->keyLength = 4;
|
big->keyLength = 4;
|
||||||
big->valueLength = 4;
|
big->valueLength = 4;
|
||||||
uint32_t * bigKey = (uint32_t *)big->getKey();
|
uint32_t * bigKey = (uint32_t *)big->getKey();
|
||||||
*bigKey = bswap(BIG);
|
*bigKey = hadoop_be32toh(BIG);
|
||||||
|
|
||||||
ComparatorPtr bytesComparator = NativeTask::get_comparator(BytesType, NULL);
|
ComparatorPtr bytesComparator = NativeTask::get_comparator(BytesType, NULL);
|
||||||
block.sort(CPPSORT, bytesComparator);
|
block.sort(CPPSORT, bytesComparator);
|
||||||
|
@ -16,7 +16,7 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "lib/commons.h"
|
#include "hadoop_endian.h"
|
||||||
#include "test_commons.h"
|
#include "test_commons.h"
|
||||||
#include "lib/PartitionBucket.h"
|
#include "lib/PartitionBucket.h"
|
||||||
#include "lib/PartitionBucketIterator.h"
|
#include "lib/PartitionBucketIterator.h"
|
||||||
@ -129,15 +129,15 @@ TEST(PartitionBucket, sort) {
|
|||||||
const uint32_t BIG = 1000;
|
const uint32_t BIG = 1000;
|
||||||
|
|
||||||
kv1->keyLength = 4;
|
kv1->keyLength = 4;
|
||||||
*((uint32_t *)kv1->getKey()) = bswap(BIG);
|
*((uint32_t *)kv1->getKey()) = hadoop_be32toh(BIG);
|
||||||
kv1->valueLength = KV_SIZE - kv1->headerLength() - kv1->keyLength;
|
kv1->valueLength = KV_SIZE - kv1->headerLength() - kv1->keyLength;
|
||||||
|
|
||||||
kv2->keyLength = 4;
|
kv2->keyLength = 4;
|
||||||
*((uint32_t *)kv2->getKey()) = bswap(SMALL);
|
*((uint32_t *)kv2->getKey()) = hadoop_be32toh(SMALL);
|
||||||
kv2->valueLength = KV_SIZE - kv2->headerLength() - kv2->keyLength;
|
kv2->valueLength = KV_SIZE - kv2->headerLength() - kv2->keyLength;
|
||||||
|
|
||||||
kv3->keyLength = 4;
|
kv3->keyLength = 4;
|
||||||
*((uint32_t *)kv3->getKey()) = bswap(MEDIUM);
|
*((uint32_t *)kv3->getKey()) = hadoop_be32toh(MEDIUM);
|
||||||
kv3->valueLength = KV_SIZE - kv3->headerLength() - kv3->keyLength;
|
kv3->valueLength = KV_SIZE - kv3->headerLength() - kv3->keyLength;
|
||||||
|
|
||||||
bucket->sort(DUALPIVOTSORT);
|
bucket->sort(DUALPIVOTSORT);
|
||||||
@ -148,13 +148,13 @@ TEST(PartitionBucket, sort) {
|
|||||||
Buffer value;
|
Buffer value;
|
||||||
iter->next(key, value);
|
iter->next(key, value);
|
||||||
|
|
||||||
ASSERT_EQ(SMALL, bswap(*(uint32_t * )key.data()));
|
ASSERT_EQ(SMALL, hadoop_be32toh(*(uint32_t * )key.data()));
|
||||||
|
|
||||||
iter->next(key, value);
|
iter->next(key, value);
|
||||||
ASSERT_EQ(MEDIUM, bswap(*(uint32_t * )key.data()));
|
ASSERT_EQ(MEDIUM, hadoop_be32toh(*(uint32_t * )key.data()));
|
||||||
|
|
||||||
iter->next(key, value);
|
iter->next(key, value);
|
||||||
ASSERT_EQ(BIG, bswap(*(uint32_t * )key.data()));
|
ASSERT_EQ(BIG, hadoop_be32toh(*(uint32_t * )key.data()));
|
||||||
|
|
||||||
delete iter;
|
delete iter;
|
||||||
delete bucket;
|
delete bucket;
|
||||||
@ -181,15 +181,15 @@ TEST(PartitionBucket, spill) {
|
|||||||
const uint32_t BIG = 1000;
|
const uint32_t BIG = 1000;
|
||||||
|
|
||||||
kv1->keyLength = 4;
|
kv1->keyLength = 4;
|
||||||
*((uint32_t *)kv1->getKey()) = bswap(BIG);
|
*((uint32_t *)kv1->getKey()) = hadoop_be32toh(BIG);
|
||||||
kv1->valueLength = KV_SIZE - KVBuffer::headerLength() - kv1->keyLength;
|
kv1->valueLength = KV_SIZE - KVBuffer::headerLength() - kv1->keyLength;
|
||||||
|
|
||||||
kv2->keyLength = 4;
|
kv2->keyLength = 4;
|
||||||
*((uint32_t *)kv2->getKey()) = bswap(SMALL);
|
*((uint32_t *)kv2->getKey()) = hadoop_be32toh(SMALL);
|
||||||
kv2->valueLength = KV_SIZE - KVBuffer::headerLength() - kv2->keyLength;
|
kv2->valueLength = KV_SIZE - KVBuffer::headerLength() - kv2->keyLength;
|
||||||
|
|
||||||
kv3->keyLength = 4;
|
kv3->keyLength = 4;
|
||||||
*((uint32_t *)kv3->getKey()) = bswap(MEDIUM);
|
*((uint32_t *)kv3->getKey()) = hadoop_be32toh(MEDIUM);
|
||||||
kv3->valueLength = KV_SIZE - KVBuffer::headerLength() - kv3->keyLength;
|
kv3->valueLength = KV_SIZE - KVBuffer::headerLength() - kv3->keyLength;
|
||||||
|
|
||||||
bucket->sort(DUALPIVOTSORT);
|
bucket->sort(DUALPIVOTSORT);
|
||||||
@ -203,17 +203,17 @@ TEST(PartitionBucket, spill) {
|
|||||||
KVBuffer * first = (KVBuffer *)writer.buff();
|
KVBuffer * first = (KVBuffer *)writer.buff();
|
||||||
ASSERT_EQ(4, first->keyLength);
|
ASSERT_EQ(4, first->keyLength);
|
||||||
ASSERT_EQ(KV_SIZE - KVBuffer::headerLength() - 4, first->valueLength);
|
ASSERT_EQ(KV_SIZE - KVBuffer::headerLength() - 4, first->valueLength);
|
||||||
ASSERT_EQ(bswap(SMALL), (*(uint32_t * )(first->getKey())));
|
ASSERT_EQ(hadoop_be32toh(SMALL), (*(uint32_t * )(first->getKey())));
|
||||||
|
|
||||||
KVBuffer * second = first->next();
|
KVBuffer * second = first->next();
|
||||||
ASSERT_EQ(4, second->keyLength);
|
ASSERT_EQ(4, second->keyLength);
|
||||||
ASSERT_EQ(KV_SIZE - KVBuffer::headerLength() - 4, second->valueLength);
|
ASSERT_EQ(KV_SIZE - KVBuffer::headerLength() - 4, second->valueLength);
|
||||||
ASSERT_EQ(bswap(MEDIUM), (*(uint32_t * )(second->getKey())));
|
ASSERT_EQ(hadoop_be32toh(MEDIUM), (*(uint32_t * )(second->getKey())));
|
||||||
|
|
||||||
KVBuffer * third = second->next();
|
KVBuffer * third = second->next();
|
||||||
ASSERT_EQ(4, third->keyLength);
|
ASSERT_EQ(4, third->keyLength);
|
||||||
ASSERT_EQ(KV_SIZE - KVBuffer::headerLength() - 4, third->valueLength);
|
ASSERT_EQ(KV_SIZE - KVBuffer::headerLength() - 4, third->valueLength);
|
||||||
ASSERT_EQ(bswap(BIG), (*(uint32_t * )(third->getKey())));
|
ASSERT_EQ(hadoop_be32toh(BIG), (*(uint32_t * )(third->getKey())));
|
||||||
|
|
||||||
delete [] buff;
|
delete [] buff;
|
||||||
delete bucket;
|
delete bucket;
|
||||||
|
Loading…
Reference in New Issue
Block a user