MAPREDUCE-6054. native-task: Speed up tests. Contributed by Todd Lipcon.

This commit is contained in:
Todd Lipcon 2014-08-27 12:25:07 -07:00
parent fad4524c85
commit bfd1d75d87
8 changed files with 124 additions and 88 deletions

View File

@ -16,4 +16,4 @@ MAPREDUCE-6006. native-task: add native tests to maven and fix bug in pom.xml (B
MAPREDUCE-6026. native-task: fix logging (Manu Zhang via todd)
MAPREDUCE-6035. native-task: sources/test-sources jar distribution (Manu Zhang via todd)
MAPREDUCE-5977. Fix or suppress native-task gcc warnings (Manu Zhang via todd)
MAPREDUCE-6054. native-task: Speed up tests (todd)

View File

@ -23,6 +23,9 @@
public class BytesUtil {
private static final char[] HEX_CHARS =
"0123456789abcdef".toCharArray();
/**
* Converts a big-endian byte array to a long value.
*
@ -124,7 +127,9 @@ public static String toStringBinary(final byte [] b, int off, int len) {
|| " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0 ) {
result.append((char)ch);
} else {
result.append(String.format("\\x%02X", ch));
result.append("\\x");
result.append(HEX_CHARS[(ch >> 4) & 0x0F]);
result.append(HEX_CHARS[ch & 0x0F]);
}
}
return result.toString();

View File

@ -20,7 +20,11 @@
import java.io.IOException;
import java.util.zip.CRC32;
import com.google.common.base.Stopwatch;
import com.google.common.primitives.Longs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -36,6 +40,7 @@
public class KVJob {
public static final String INPUTPATH = "nativetask.kvtest.inputfile.path";
public static final String OUTPUTPATH = "nativetask.kvtest.outputfile.path";
private static final Log LOG = LogFactory.getLog(KVJob.class);
Job job = null;
public static class ValueMapper<KTYPE, VTYPE> extends Mapper<KTYPE, VTYPE, KTYPE, VTYPE> {
@ -82,8 +87,9 @@ public KVJob(String jobname, Configuration conf, Class<?> keyclass, Class<?> val
final TestInputFile testfile = new TestInputFile(Integer.valueOf(conf.get(
TestConstants.FILESIZE_KEY, "1000")),
keyclass.getName(), valueclass.getName(), conf);
Stopwatch sw = new Stopwatch().start();
testfile.createSequenceTestFile(inputpath);
LOG.info("Created test file " + inputpath + " in " + sw.elapsedMillis() + "ms");
}
job.setInputFormatClass(SequenceFileInputFormat.class);
FileInputFormat.addInputPath(job, new Path(inputpath));

View File

@ -20,32 +20,29 @@
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapred.nativetask.testutil.ResultVerifier;
import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
@RunWith(Parameterized.class)
public class KVTest {
private static final Log LOG = LogFactory.getLog(KVTest.class);
private static Class<?>[] keyclasses = null;
private static Class<?>[] valueclasses = null;
private static String[] keyclassNames = null;
private static String[] valueclassNames = null;
private static Configuration nativekvtestconf = ScenarioConfiguration.getNativeConfiguration();
private static Configuration hadoopkvtestconf = ScenarioConfiguration.getNormalConfiguration();
static {
@ -53,50 +50,46 @@ public class KVTest {
hadoopkvtestconf.addResource(TestConstants.KVTEST_CONF_PATH);
}
private static List<Class<?>> parseClassNames(String spec) {
List<Class<?>> ret = Lists.newArrayList();
Iterable<String> classNames = Splitter.on(';').trimResults()
.omitEmptyStrings().split(spec);
for (String className : classNames) {
try {
ret.add(Class.forName(className));
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
return ret;
}
/**
* Parameterize the test with the specified key and value types.
*/
@Parameters(name = "key:{0}\nvalue:{1}")
public static Iterable<Class<?>[]> data() {
final String valueclassesStr = nativekvtestconf
public static Iterable<Class<?>[]> data() throws Exception {
// Parse the config.
final String valueClassesStr = nativekvtestconf
.get(TestConstants.NATIVETASK_KVTEST_VALUECLASSES);
LOG.info(valueclassesStr);
valueclassNames = valueclassesStr.replaceAll("\\s", "").split(";");// delete
// " "
final ArrayList<Class<?>> tmpvalueclasses = new ArrayList<Class<?>>();
for (int i = 0; i < valueclassNames.length; i++) {
try {
if (valueclassNames[i].equals("")) {
continue;
}
tmpvalueclasses.add(Class.forName(valueclassNames[i]));
} catch (final ClassNotFoundException e) {
e.printStackTrace();
}
LOG.info("Parameterizing with value classes: " + valueClassesStr);
List<Class<?>> valueClasses = parseClassNames(valueClassesStr);
final String keyClassesStr = nativekvtestconf.get(
TestConstants.NATIVETASK_KVTEST_KEYCLASSES);
LOG.info("Parameterizing with key classes: " + keyClassesStr);
List<Class<?>> keyClasses = parseClassNames(keyClassesStr);
// Generate an entry for each key type.
List<Class<?>[]> pairs = Lists.newArrayList();
for (Class<?> keyClass : keyClasses) {
pairs.add(new Class<?>[]{ keyClass, LongWritable.class });
}
valueclasses = tmpvalueclasses.toArray(new Class[tmpvalueclasses.size()]);
final String keyclassesStr = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_KEYCLASSES);
LOG.info(keyclassesStr);
keyclassNames = keyclassesStr.replaceAll("\\s", "").split(";");// delete
// " "
final ArrayList<Class<?>> tmpkeyclasses = new ArrayList<Class<?>>();
for (int i = 0; i < keyclassNames.length; i++) {
try {
if (keyclassNames[i].equals("")) {
continue;
}
tmpkeyclasses.add(Class.forName(keyclassNames[i]));
} catch (final ClassNotFoundException e) {
e.printStackTrace();
}
// ...and for each value type.
for (Class<?> valueClass : valueClasses) {
pairs.add(new Class<?>[]{ LongWritable.class, valueClass });
}
keyclasses = tmpkeyclasses.toArray(new Class[tmpkeyclasses.size()]);
final Class<?>[][] kvgroup = new Class<?>[keyclassNames.length * valueclassNames.length][2];
for (int i = 0; i < keyclassNames.length; i++) {
final int tmpindex = i * valueclassNames.length;
for (int j = 0; j < valueclassNames.length; j++) {
kvgroup[tmpindex + j][0] = keyclasses[i];
kvgroup[tmpindex + j][1] = valueclasses[j];
}
}
return Arrays.asList(kvgroup);
return pairs;
}
private final Class<?> keyclass;
@ -105,12 +98,10 @@ public static Iterable<Class<?>[]> data() {
public KVTest(Class<?> keyclass, Class<?> valueclass) {
this.keyclass = keyclass;
this.valueclass = valueclass;
}
@Test
public void testKVCompability() {
try {
final String nativeoutput = this.runNativeTest(
"Test:" + keyclass.getSimpleName() + "--" + valueclass.getSimpleName(), keyclass, valueclass);
@ -135,11 +126,6 @@ public void testKVCompability() {
}
}
@Before
public void startUp() {
}
private String runNativeTest(String jobname, Class<?> keyclass, Class<?> valueclass) throws IOException {
final String inputpath = nativekvtestconf.get(TestConstants.NATIVETASK_KVTEST_INPUTDIR) + "/"
+ keyclass.getName()

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.nativetask.testutil.BytesFactory;
import org.apache.hadoop.mapred.nativetask.testutil.ScenarioConfiguration;
import org.apache.hadoop.mapred.nativetask.testutil.TestConstants;
@ -123,7 +124,7 @@ public void createSequenceTestFile(String filepath, int base) throws Exception {
}
public void createSequenceTestFile(String filepath, int base, byte start) throws Exception {
LOG.info("create file " + filepath);
LOG.info("creating file " + filepath + "(" + filesize + " bytes)");
LOG.info(keyClsName + " " + valueClsName);
Class<?> tmpkeycls, tmpvaluecls;
try {
@ -178,6 +179,9 @@ private int flushBuf(int buflen) throws Exception {
int valuebytesnum = 0;
int offset = 0;
Writable keyWritable = BytesFactory.newObject(null, keyClsName);
Writable valWritable = BytesFactory.newObject(null, valueClsName);
while (offset < buflen) {
final int remains = buflen - offset;
keybytesnum = keyMaxBytesNum;
@ -202,9 +206,12 @@ private int flushBuf(int buflen) throws Exception {
System.arraycopy(databuf, offset, value, 0, valuebytesnum);
offset += valuebytesnum;
BytesFactory.updateObject(keyWritable, key);
BytesFactory.updateObject(valWritable, value);
try {
writer.append(BytesFactory.newObject(key, this.keyClsName), BytesFactory.newObject(value, this.valueClsName));
writer.append(keyWritable, valWritable);
} catch (final IOException e) {
e.printStackTrace();
throw new Exception("sequence file create failed", e);

View File

@ -19,8 +19,10 @@
import java.util.Random;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
@ -32,43 +34,58 @@
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.VIntWritable;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.nativetask.util.BytesUtil;
public class BytesFactory {
public static Random r = new Random();
public static Object newObject(byte[] seed, String className) {
r.setSeed(seed.hashCode());
if (className.equals(IntWritable.class.getName())) {
return new IntWritable(Ints.fromByteArray(seed));
} else if (className.equals(FloatWritable.class.getName())) {
return new FloatWritable(r.nextFloat());
} else if (className.equals(DoubleWritable.class.getName())) {
return new DoubleWritable(r.nextDouble());
} else if (className.equals(LongWritable.class.getName())) {
return new LongWritable(Longs.fromByteArray(seed));
} else if (className.equals(VIntWritable.class.getName())) {
return new VIntWritable(Ints.fromByteArray(seed));
} else if (className.equals(VLongWritable.class.getName())) {
return new VLongWritable(Longs.fromByteArray(seed));
} else if (className.equals(BooleanWritable.class.getName())) {
return new BooleanWritable(seed[0] % 2 == 1 ? true : false);
} else if (className.equals(Text.class.getName())) {
return new Text(BytesUtil.toStringBinary(seed));
} else if (className.equals(ByteWritable.class.getName())) {
return new ByteWritable(seed.length > 0 ? seed[0] : 0);
} else if (className.equals(BytesWritable.class.getName())) {
return new BytesWritable(seed);
} else if (className.equals(UTF8.class.getName())) {
return new UTF8(BytesUtil.toStringBinary(seed));
} else if (className.equals(MockValueClass.class.getName())) {
return new MockValueClass(seed);
public static void updateObject(Writable obj, byte[] seed) {
if (obj instanceof IntWritable) {
((IntWritable)obj).set(Ints.fromByteArray(seed));
} else if (obj instanceof FloatWritable) {
((FloatWritable)obj).set(r.nextFloat());
} else if (obj instanceof DoubleWritable) {
((DoubleWritable)obj).set(r.nextDouble());
} else if (obj instanceof LongWritable) {
((LongWritable)obj).set(Longs.fromByteArray(seed));
} else if (obj instanceof VIntWritable) {
((VIntWritable)obj).set(Ints.fromByteArray(seed));
} else if (obj instanceof VLongWritable) {
((VLongWritable)obj).set(Longs.fromByteArray(seed));
} else if (obj instanceof BooleanWritable) {
((BooleanWritable)obj).set(seed[0] % 2 == 1 ? true : false);
} else if (obj instanceof Text) {
((Text)obj).set(BytesUtil.toStringBinary(seed));
} else if (obj instanceof ByteWritable) {
((ByteWritable)obj).set(seed.length > 0 ? seed[0] : 0);
} else if (obj instanceof BytesWritable) {
((BytesWritable)obj).set(seed, 0, seed.length);
} else if (obj instanceof UTF8) {
((UTF8)obj).set(BytesUtil.toStringBinary(seed));
} else if (obj instanceof MockValueClass) {
((MockValueClass)obj).set(seed);
} else {
return null;
throw new IllegalArgumentException("unknown writable: " +
obj.getClass().getName());
}
}
public static Writable newObject(byte[] seed, String className) {
Writable ret;
try {
Class<?> clazz = Class.forName(className);
Preconditions.checkArgument(Writable.class.isAssignableFrom(clazz));
ret = (Writable)clazz.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
if (seed != null) {
updateObject(ret, seed);
}
return ret;
}
public static <VTYPE> byte[] fromBytes(byte[] bytes) throws Exception {
throw new Exception("Not supported");

View File

@ -51,6 +51,14 @@ public MockValueClass(byte[] seed) {
txt = new Text(BytesUtil.toStringBinary(array));
}
public void set(byte[] seed) {
a = seed.length;
array = new byte[seed.length];
System.arraycopy(seed, 0, array, 0, seed.length);
longWritable.set(a);
txt.set(BytesUtil.toStringBinary(array));
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(a);

View File

@ -53,4 +53,11 @@ public void testBytesDoubleConversion() {
Assert.assertEquals(d, BytesUtil.toDouble(doubleBytes));
}
public void testToStringBinary() {
Assert.assertEquals("\\x01\\x02ABC",
BytesUtil.toStringBinary(new byte[] { 1, 2, 65, 66, 67 }));
Assert.assertEquals("\\x10\\x11",
BytesUtil.toStringBinary(new byte[] { 16, 17 }));
}
}