diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java index 05d4d77744..d218169280 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.SequenceFile.CompressionType; @@ -106,9 +107,14 @@ public class TestDFSIO implements Tool { " [-nrFiles N]" + " [-size Size[B|KB|MB|GB|TB]]" + " [-resFile resultFileName] [-bufferSize Bytes]" + - " [-storagePolicy storagePolicyName]"; + " [-storagePolicy storagePolicyName]" + + " [-erasureCodePolicy erasureCodePolicyName]"; private Configuration config; + private static final String STORAGE_POLICY_NAME_KEY = + "test.io.block.storage.policy"; + private static final String ERASURE_CODE_POLICY_NAME_KEY = + "test.io.erasure.code.policy"; static{ Configuration.addDefaultResource("hdfs-default.xml"); @@ -211,9 +217,9 @@ public static void beforeClass() throws Exception { bench = new TestDFSIO(); bench.getConf().setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); cluster = new MiniDFSCluster.Builder(bench.getConf()) - .numDataNodes(2) - .format(true) - .build(); + .numDataNodes(2) + .format(true) + .build(); FileSystem fs = cluster.getFileSystem(); bench.createControlFile(fs, DEFAULT_NR_BYTES, DEFAULT_NR_FILES); @@ -356,7 +362,7 @@ public void configure(JobConf conf) { ReflectionUtils.newInstance(codec, getConf()); } - blockStoragePolicy = getConf().get("test.io.block.storage.policy", null); + blockStoragePolicy = getConf().get(STORAGE_POLICY_NAME_KEY, null); } @Override // IOMapperBase @@ -388,9 +394,10 @@ void collectStats(OutputCollector output, */ public static class WriteMapper extends IOStatMapper { - public WriteMapper() { - for(int i=0; i < bufferSize; i++) - buffer[i] = (byte)('0' + i % 50); + public WriteMapper() { + for (int i = 0; i < bufferSize; i++) { + buffer[i] = (byte) ('0' + i % 50); + } } @Override // IOMapperBase @@ -431,6 +438,9 @@ private long writeTest(FileSystem fs) throws IOException { fs.delete(getDataDir(config), true); fs.delete(writeDir, true); long tStart = System.currentTimeMillis(); + if (isECEnabled()) { + createAndEnableECOnPath(fs, getDataDir(config)); + } runIOTest(WriteMapper.class, writeDir); long execTime = System.currentTimeMillis() - tStart; return execTime; @@ -734,6 +744,7 @@ public int run(String[] args) throws IOException { TestType testType = null; int bufferSize = DEFAULT_BUFFER_SIZE; long nrBytes = 1*MEGA; + String erasureCodePolicyName = null; int nrFiles = 1; long skipSize = 0; String resFileName = DEFAULT_RES_FILE_NAME; @@ -785,26 +796,31 @@ public int run(String[] args) throws IOException { resFileName = args[++i]; } else if (args[i].equalsIgnoreCase("-storagePolicy")) { storagePolicy = args[++i]; + } else if (args[i].equalsIgnoreCase("-erasureCodePolicy")) { + erasureCodePolicyName = args[++i]; } else { System.err.println("Illegal argument: " + args[i]); return -1; } } - if(testType == null) + if (testType == null) { return -1; - if(testType == TestType.TEST_TYPE_READ_BACKWARD) + } + if (testType == TestType.TEST_TYPE_READ_BACKWARD) { skipSize = -bufferSize; - else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0) + } else if (testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0) { skipSize = bufferSize; + } LOG.info("nrFiles = " + nrFiles); LOG.info("nrBytes (MB) = " + toMB(nrBytes)); LOG.info("bufferSize = " + bufferSize); - if(skipSize > 0) + if (skipSize > 0) { LOG.info("skipSize = " + skipSize); + } LOG.info("baseDir = " + getBaseDir(config)); - if(compressionClass != null) { + if (compressionClass != null) { config.set("test.io.compression.class", compressionClass); LOG.info("compressionClass = " + compressionClass); } @@ -813,31 +829,16 @@ else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0) config.setLong("test.io.skip.size", skipSize); FileSystem fs = FileSystem.get(config); - if (storagePolicy != null) { - boolean isValid = false; - Collection storagePolicies = - ((DistributedFileSystem) fs).getAllStoragePolicies(); - try { - for (BlockStoragePolicy policy : storagePolicies) { - if (policy.getName().equals(storagePolicy)) { - isValid = true; - break; - } - } - } catch (Exception e) { - throw new IOException("Get block storage policies error: ", e); - } - if (!isValid) { - System.out.println("Invalid block storage policy: " + storagePolicy); - System.out.println("Current supported storage policy list: "); - for (BlockStoragePolicy policy : storagePolicies) { - System.out.println(policy.getName()); - } + if (erasureCodePolicyName != null) { + if (!checkErasureCodePolicy(erasureCodePolicyName, fs, testType)) { return -1; } + } - config.set("test.io.block.storage.policy", storagePolicy); - LOG.info("storagePolicy = " + storagePolicy); + if (storagePolicy != null) { + if (!checkStoragePolicy(storagePolicy, fs)) { + return -1; + } } if (isSequential) { @@ -908,6 +909,94 @@ static float toMB(long bytes) { return ((float)bytes)/MEGA; } + private boolean checkErasureCodePolicy(String erasureCodePolicyName, + FileSystem fs, TestType testType) throws IOException { + Collection list = + ((DistributedFileSystem) fs).getAllErasureCodingPolicies(); + boolean isValid = false; + for (ErasureCodingPolicy ec : list) { + if (erasureCodePolicyName.equals(ec.getName())) { + isValid = true; + break; + } + } + + if (!isValid) { + System.out.println("Invalid erasure code policy: " + + erasureCodePolicyName); + System.out.println("Current supported erasure code policy list: "); + for (ErasureCodingPolicy ec : list) { + System.out.println(ec.getName()); + } + return false; + } + + if (testType == TestType.TEST_TYPE_APPEND || + testType == TestType.TEST_TYPE_TRUNCATE) { + System.out.println("So far append or truncate operation" + + " does not support erasureCodePolicy"); + return false; + } + + config.set(ERASURE_CODE_POLICY_NAME_KEY, erasureCodePolicyName); + LOG.info("erasureCodePolicy = " + erasureCodePolicyName); + return true; + } + + private boolean checkStoragePolicy(String storagePolicy, FileSystem fs) + throws IOException { + boolean isValid = false; + Collection storagePolicies = + ((DistributedFileSystem) fs).getAllStoragePolicies(); + try { + for (BlockStoragePolicy policy : storagePolicies) { + if (policy.getName().equals(storagePolicy)) { + isValid = true; + break; + } + } + } catch (Exception e) { + throw new IOException("Get block storage policies error: ", e); + } + + if (!isValid) { + System.out.println("Invalid block storage policy: " + storagePolicy); + System.out.println("Current supported storage policy list: "); + for (BlockStoragePolicy policy : storagePolicies) { + System.out.println(policy.getName()); + } + return false; + } + + config.set(STORAGE_POLICY_NAME_KEY, storagePolicy); + LOG.info("storagePolicy = " + storagePolicy); + return true; + } + + private boolean isECEnabled() { + String erasureCodePolicyName = + getConf().get(ERASURE_CODE_POLICY_NAME_KEY, null); + return erasureCodePolicyName != null ? true : false; + } + + void createAndEnableECOnPath(FileSystem fs, Path path) + throws IOException { + String erasureCodePolicyName = + getConf().get(ERASURE_CODE_POLICY_NAME_KEY, null); + + fs.mkdirs(path); + Collection list = + ((DistributedFileSystem) fs).getAllErasureCodingPolicies(); + for (ErasureCodingPolicy ec : list) { + if (erasureCodePolicyName.equals(ec.getName())) { + ((DistributedFileSystem) fs).setErasureCodingPolicy(path, ec); + LOG.info("enable erasureCodePolicy = " + erasureCodePolicyName + + " on " + path.toString()); + break; + } + } + } + private void analyzeResult( FileSystem fs, TestType testType, long execTime,