MAPREDUCE-6578. Add support for HDFS heterogeneous storage testing to TestDFSIO. Contributed by Wei Zhou and Sammi Chen

This commit is contained in:
Kai Zheng 2016-08-24 22:17:05 +08:00
parent 793447f799
commit 0ce1ab95cc

View File

@ -29,6 +29,7 @@
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.text.DecimalFormat; import java.text.DecimalFormat;
import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.Random; import java.util.Random;
import java.util.StringTokenizer; import java.util.StringTokenizer;
@ -36,7 +37,9 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.CompressionType;
@ -102,7 +105,8 @@ public class TestDFSIO implements Tool {
" [-compression codecClassName]" + " [-compression codecClassName]" +
" [-nrFiles N]" + " [-nrFiles N]" +
" [-size Size[B|KB|MB|GB|TB]]" + " [-size Size[B|KB|MB|GB|TB]]" +
" [-resFile resultFileName] [-bufferSize Bytes]"; " [-resFile resultFileName] [-bufferSize Bytes]" +
" [-storagePolicy storagePolicyName]";
private Configuration config; private Configuration config;
@ -326,6 +330,7 @@ private static String getFileName(int fIdx) {
*/ */
private abstract static class IOStatMapper extends IOMapperBase<Long> { private abstract static class IOStatMapper extends IOMapperBase<Long> {
protected CompressionCodec compressionCodec; protected CompressionCodec compressionCodec;
protected String blockStoragePolicy;
IOStatMapper() { IOStatMapper() {
} }
@ -350,6 +355,8 @@ public void configure(JobConf conf) {
compressionCodec = (CompressionCodec) compressionCodec = (CompressionCodec)
ReflectionUtils.newInstance(codec, getConf()); ReflectionUtils.newInstance(codec, getConf());
} }
blockStoragePolicy = getConf().get("test.io.block.storage.policy", null);
} }
@Override // IOMapperBase @Override // IOMapperBase
@ -389,8 +396,11 @@ public WriteMapper() {
@Override // IOMapperBase @Override // IOMapperBase
public Closeable getIOStream(String name) throws IOException { public Closeable getIOStream(String name) throws IOException {
// create file // create file
OutputStream out = Path filePath = new Path(getDataDir(getConf()), name);
fs.create(new Path(getDataDir(getConf()), name), true, bufferSize); OutputStream out = fs.create(filePath, true, bufferSize);
if (blockStoragePolicy != null) {
fs.setStoragePolicy(filePath, blockStoragePolicy);
}
if(compressionCodec != null) if(compressionCodec != null)
out = compressionCodec.createOutputStream(out); out = compressionCodec.createOutputStream(out);
LOG.info("out = " + out.getClass().getName()); LOG.info("out = " + out.getClass().getName());
@ -713,8 +723,9 @@ public static void main(String[] args) {
System.err.print(StringUtils.stringifyException(e)); System.err.print(StringUtils.stringifyException(e));
res = -2; res = -2;
} }
if(res == -1) if (res == -1) {
System.err.print(USAGE); System.err.println(USAGE);
}
System.exit(res); System.exit(res);
} }
@ -727,6 +738,7 @@ public int run(String[] args) throws IOException {
long skipSize = 0; long skipSize = 0;
String resFileName = DEFAULT_RES_FILE_NAME; String resFileName = DEFAULT_RES_FILE_NAME;
String compressionClass = null; String compressionClass = null;
String storagePolicy = null;
boolean isSequential = false; boolean isSequential = false;
String version = TestDFSIO.class.getSimpleName() + ".1.8"; String version = TestDFSIO.class.getSimpleName() + ".1.8";
@ -771,6 +783,8 @@ public int run(String[] args) throws IOException {
bufferSize = Integer.parseInt(args[++i]); bufferSize = Integer.parseInt(args[++i]);
} else if (args[i].equalsIgnoreCase("-resfile")) { } else if (args[i].equalsIgnoreCase("-resfile")) {
resFileName = args[++i]; resFileName = args[++i];
} else if (args[i].equalsIgnoreCase("-storagePolicy")) {
storagePolicy = args[++i];
} else { } else {
System.err.println("Illegal argument: " + args[i]); System.err.println("Illegal argument: " + args[i]);
return -1; return -1;
@ -799,6 +813,33 @@ else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0)
config.setLong("test.io.skip.size", skipSize); config.setLong("test.io.skip.size", skipSize);
FileSystem fs = FileSystem.get(config); FileSystem fs = FileSystem.get(config);
if (storagePolicy != null) {
boolean isValid = false;
Collection<BlockStoragePolicy> storagePolicies =
((DistributedFileSystem) fs).getAllStoragePolicies();
try {
for (BlockStoragePolicy policy : storagePolicies) {
if (policy.getName().equals(storagePolicy)) {
isValid = true;
break;
}
}
} catch (Exception e) {
throw new IOException("Get block storage policies error: ", e);
}
if (!isValid) {
System.out.println("Invalid block storage policy: " + storagePolicy);
System.out.println("Current supported storage policy list: ");
for (BlockStoragePolicy policy : storagePolicies) {
System.out.println(policy.getName());
}
return -1;
}
config.set("test.io.block.storage.policy", storagePolicy);
LOG.info("storagePolicy = " + storagePolicy);
}
if (isSequential) { if (isSequential) {
long tStart = System.currentTimeMillis(); long tStart = System.currentTimeMillis();
sequentialTest(fs, testType, nrBytes, nrFiles); sequentialTest(fs, testType, nrBytes, nrFiles);