MAPREDUCE-6774. Add support for HDFS erasure code policy to TestDFSIO. Contributed by Sammi Chen

This commit is contained in:
Kai Zheng 2016-09-18 09:03:15 +08:00
parent 58bae35447
commit 501a77856d

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
@ -106,9 +107,14 @@ public class TestDFSIO implements Tool {
" [-nrFiles N]" +
" [-size Size[B|KB|MB|GB|TB]]" +
" [-resFile resultFileName] [-bufferSize Bytes]" +
" [-storagePolicy storagePolicyName]";
" [-storagePolicy storagePolicyName]" +
" [-erasureCodePolicy erasureCodePolicyName]";
private Configuration config;
private static final String STORAGE_POLICY_NAME_KEY =
"test.io.block.storage.policy";
private static final String ERASURE_CODE_POLICY_NAME_KEY =
"test.io.erasure.code.policy";
static{
Configuration.addDefaultResource("hdfs-default.xml");
@ -211,9 +217,9 @@ public static void beforeClass() throws Exception {
bench = new TestDFSIO();
bench.getConf().setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
cluster = new MiniDFSCluster.Builder(bench.getConf())
.numDataNodes(2)
.format(true)
.build();
.numDataNodes(2)
.format(true)
.build();
FileSystem fs = cluster.getFileSystem();
bench.createControlFile(fs, DEFAULT_NR_BYTES, DEFAULT_NR_FILES);
@ -356,7 +362,7 @@ public void configure(JobConf conf) {
ReflectionUtils.newInstance(codec, getConf());
}
blockStoragePolicy = getConf().get("test.io.block.storage.policy", null);
blockStoragePolicy = getConf().get(STORAGE_POLICY_NAME_KEY, null);
}
@Override // IOMapperBase
@ -388,9 +394,10 @@ void collectStats(OutputCollector<Text, Text> output,
*/
public static class WriteMapper extends IOStatMapper {
public WriteMapper() {
for(int i=0; i < bufferSize; i++)
buffer[i] = (byte)('0' + i % 50);
public WriteMapper() {
for (int i = 0; i < bufferSize; i++) {
buffer[i] = (byte) ('0' + i % 50);
}
}
@Override // IOMapperBase
@ -431,6 +438,9 @@ private long writeTest(FileSystem fs) throws IOException {
fs.delete(getDataDir(config), true);
fs.delete(writeDir, true);
long tStart = System.currentTimeMillis();
if (isECEnabled()) {
createAndEnableECOnPath(fs, getDataDir(config));
}
runIOTest(WriteMapper.class, writeDir);
long execTime = System.currentTimeMillis() - tStart;
return execTime;
@ -734,6 +744,7 @@ public int run(String[] args) throws IOException {
TestType testType = null;
int bufferSize = DEFAULT_BUFFER_SIZE;
long nrBytes = 1*MEGA;
String erasureCodePolicyName = null;
int nrFiles = 1;
long skipSize = 0;
String resFileName = DEFAULT_RES_FILE_NAME;
@ -785,26 +796,31 @@ public int run(String[] args) throws IOException {
resFileName = args[++i];
} else if (args[i].equalsIgnoreCase("-storagePolicy")) {
storagePolicy = args[++i];
} else if (args[i].equalsIgnoreCase("-erasureCodePolicy")) {
erasureCodePolicyName = args[++i];
} else {
System.err.println("Illegal argument: " + args[i]);
return -1;
}
}
if(testType == null)
if (testType == null) {
return -1;
if(testType == TestType.TEST_TYPE_READ_BACKWARD)
}
if (testType == TestType.TEST_TYPE_READ_BACKWARD) {
skipSize = -bufferSize;
else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0)
} else if (testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0) {
skipSize = bufferSize;
}
LOG.info("nrFiles = " + nrFiles);
LOG.info("nrBytes (MB) = " + toMB(nrBytes));
LOG.info("bufferSize = " + bufferSize);
if(skipSize > 0)
if (skipSize > 0) {
LOG.info("skipSize = " + skipSize);
}
LOG.info("baseDir = " + getBaseDir(config));
if(compressionClass != null) {
if (compressionClass != null) {
config.set("test.io.compression.class", compressionClass);
LOG.info("compressionClass = " + compressionClass);
}
@ -813,31 +829,16 @@ else if(testType == TestType.TEST_TYPE_READ_SKIP && skipSize == 0)
config.setLong("test.io.skip.size", skipSize);
FileSystem fs = FileSystem.get(config);
if (storagePolicy != null) {
boolean isValid = false;
Collection<BlockStoragePolicy> storagePolicies =
((DistributedFileSystem) fs).getAllStoragePolicies();
try {
for (BlockStoragePolicy policy : storagePolicies) {
if (policy.getName().equals(storagePolicy)) {
isValid = true;
break;
}
}
} catch (Exception e) {
throw new IOException("Get block storage policies error: ", e);
}
if (!isValid) {
System.out.println("Invalid block storage policy: " + storagePolicy);
System.out.println("Current supported storage policy list: ");
for (BlockStoragePolicy policy : storagePolicies) {
System.out.println(policy.getName());
}
if (erasureCodePolicyName != null) {
if (!checkErasureCodePolicy(erasureCodePolicyName, fs, testType)) {
return -1;
}
}
config.set("test.io.block.storage.policy", storagePolicy);
LOG.info("storagePolicy = " + storagePolicy);
if (storagePolicy != null) {
if (!checkStoragePolicy(storagePolicy, fs)) {
return -1;
}
}
if (isSequential) {
@ -908,6 +909,94 @@ static float toMB(long bytes) {
return ((float)bytes)/MEGA;
}
private boolean checkErasureCodePolicy(String erasureCodePolicyName,
FileSystem fs, TestType testType) throws IOException {
Collection<ErasureCodingPolicy> list =
((DistributedFileSystem) fs).getAllErasureCodingPolicies();
boolean isValid = false;
for (ErasureCodingPolicy ec : list) {
if (erasureCodePolicyName.equals(ec.getName())) {
isValid = true;
break;
}
}
if (!isValid) {
System.out.println("Invalid erasure code policy: " +
erasureCodePolicyName);
System.out.println("Current supported erasure code policy list: ");
for (ErasureCodingPolicy ec : list) {
System.out.println(ec.getName());
}
return false;
}
if (testType == TestType.TEST_TYPE_APPEND ||
testType == TestType.TEST_TYPE_TRUNCATE) {
System.out.println("So far append or truncate operation" +
" does not support erasureCodePolicy");
return false;
}
config.set(ERASURE_CODE_POLICY_NAME_KEY, erasureCodePolicyName);
LOG.info("erasureCodePolicy = " + erasureCodePolicyName);
return true;
}
private boolean checkStoragePolicy(String storagePolicy, FileSystem fs)
throws IOException {
boolean isValid = false;
Collection<BlockStoragePolicy> storagePolicies =
((DistributedFileSystem) fs).getAllStoragePolicies();
try {
for (BlockStoragePolicy policy : storagePolicies) {
if (policy.getName().equals(storagePolicy)) {
isValid = true;
break;
}
}
} catch (Exception e) {
throw new IOException("Get block storage policies error: ", e);
}
if (!isValid) {
System.out.println("Invalid block storage policy: " + storagePolicy);
System.out.println("Current supported storage policy list: ");
for (BlockStoragePolicy policy : storagePolicies) {
System.out.println(policy.getName());
}
return false;
}
config.set(STORAGE_POLICY_NAME_KEY, storagePolicy);
LOG.info("storagePolicy = " + storagePolicy);
return true;
}
private boolean isECEnabled() {
String erasureCodePolicyName =
getConf().get(ERASURE_CODE_POLICY_NAME_KEY, null);
return erasureCodePolicyName != null ? true : false;
}
void createAndEnableECOnPath(FileSystem fs, Path path)
throws IOException {
String erasureCodePolicyName =
getConf().get(ERASURE_CODE_POLICY_NAME_KEY, null);
fs.mkdirs(path);
Collection<ErasureCodingPolicy> list =
((DistributedFileSystem) fs).getAllErasureCodingPolicies();
for (ErasureCodingPolicy ec : list) {
if (erasureCodePolicyName.equals(ec.getName())) {
((DistributedFileSystem) fs).setErasureCodingPolicy(path, ec);
LOG.info("enable erasureCodePolicy = " + erasureCodePolicyName +
" on " + path.toString());
break;
}
}
}
private void analyzeResult( FileSystem fs,
TestType testType,
long execTime,