HDFS-11416. Refactor out system default erasure coding policy. Contributed by Andrew Wang.
This commit is contained in:
parent
8f4817f2c5
commit
3749152b66
@ -177,6 +177,17 @@ public static String byteToHexString(byte bytes[]) {
|
||||
return byteToHexString(bytes, 0, bytes.length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a byte to a hex string.
|
||||
* @see #byteToHexString(byte[])
|
||||
* @see #byteToHexString(byte[], int, int)
|
||||
* @param b byte
|
||||
* @return byte's hex value as a String
|
||||
*/
|
||||
public static String byteToHexString(byte b) {
|
||||
return byteToHexString(new byte[] {b});
|
||||
}
|
||||
|
||||
/**
|
||||
* Given a hexstring this will return the byte array corresponding to the
|
||||
* string
|
||||
|
@ -102,6 +102,7 @@ static List<XAttr> setErasureCodingPolicyXAttr(final FSNamesystem fsn,
|
||||
FSDirectory fsd = fsn.getFSDirectory();
|
||||
assert fsd.hasWriteLock();
|
||||
Preconditions.checkNotNull(srcIIP, "INodes cannot be null");
|
||||
Preconditions.checkNotNull(ecPolicy, "EC policy cannot be null");
|
||||
String src = srcIIP.getPath();
|
||||
final INode inode = srcIIP.getLastINode();
|
||||
if (inode == null) {
|
||||
@ -112,29 +113,24 @@ static List<XAttr> setErasureCodingPolicyXAttr(final FSNamesystem fsn,
|
||||
"for a file " + src);
|
||||
}
|
||||
|
||||
// System default erasure coding policy will be used since no specified.
|
||||
if (ecPolicy == null) {
|
||||
ecPolicy = ErasureCodingPolicyManager.getSystemDefaultPolicy();
|
||||
} else {
|
||||
// If ecPolicy is specified check if it is one among active policies.
|
||||
boolean validPolicy = false;
|
||||
ErasureCodingPolicy[] activePolicies =
|
||||
FSDirErasureCodingOp.getErasureCodingPolicies(fsd.getFSNamesystem());
|
||||
// Check that the EC policy is one of the active policies.
|
||||
boolean validPolicy = false;
|
||||
ErasureCodingPolicy[] activePolicies =
|
||||
FSDirErasureCodingOp.getErasureCodingPolicies(fsd.getFSNamesystem());
|
||||
for (ErasureCodingPolicy activePolicy : activePolicies) {
|
||||
if (activePolicy.equals(ecPolicy)) {
|
||||
validPolicy = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!validPolicy) {
|
||||
List<String> ecPolicyNames = new ArrayList<String>();
|
||||
for (ErasureCodingPolicy activePolicy : activePolicies) {
|
||||
if (activePolicy.equals(ecPolicy)) {
|
||||
validPolicy = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!validPolicy) {
|
||||
List<String> ecPolicyNames = new ArrayList<String>();
|
||||
for (ErasureCodingPolicy activePolicy : activePolicies) {
|
||||
ecPolicyNames.add(activePolicy.getName());
|
||||
}
|
||||
throw new HadoopIllegalArgumentException("Policy [ " +
|
||||
ecPolicy.getName() + " ] does not match any of the " +
|
||||
"supported policies. Please select any one of " + ecPolicyNames);
|
||||
ecPolicyNames.add(activePolicy.getName());
|
||||
}
|
||||
throw new HadoopIllegalArgumentException("Policy [ " +
|
||||
ecPolicy.getName() + " ] does not match any of the " +
|
||||
"supported policies. Please select any one of " + ecPolicyNames);
|
||||
}
|
||||
|
||||
final XAttr ecXAttr;
|
||||
|
@ -1108,8 +1108,7 @@ private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
|
||||
// versions of Hadoop. Current versions always log
|
||||
// OP_ADD operations as each block is allocated.
|
||||
if (isStriped) {
|
||||
newBI = new BlockInfoStriped(newBlock,
|
||||
ErasureCodingPolicyManager.getSystemDefaultPolicy());
|
||||
newBI = new BlockInfoStriped(newBlock, ecPolicy);
|
||||
} else {
|
||||
newBI = new BlockInfoContiguous(newBlock,
|
||||
file.getFileReplication());
|
||||
|
@ -56,6 +56,7 @@
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/** I-node for closed file. */
|
||||
@InterfaceAudience.Private
|
||||
@ -190,9 +191,10 @@ static long getBlockLayoutRedundancy(final BlockType blockType,
|
||||
if (blockType == STRIPED) {
|
||||
Preconditions.checkArgument(replication == null &&
|
||||
erasureCodingPolicyID != null);
|
||||
Preconditions.checkArgument(
|
||||
ErasureCodingPolicyManager.getPolicyByPolicyID(
|
||||
erasureCodingPolicyID) != null);
|
||||
Preconditions.checkArgument(ErasureCodingPolicyManager
|
||||
.getPolicyByPolicyID(erasureCodingPolicyID) != null,
|
||||
"Could not find EC policy with ID 0x" + StringUtils
|
||||
.byteToHexString(erasureCodingPolicyID));
|
||||
layoutRedundancy |= BLOCK_TYPE_MASK_STRIPED;
|
||||
// Following bitwise OR with signed byte erasureCodingPolicyID is safe
|
||||
// as the PolicyID can never be in negative.
|
||||
@ -201,7 +203,8 @@ static long getBlockLayoutRedundancy(final BlockType blockType,
|
||||
Preconditions.checkArgument(replication != null &&
|
||||
erasureCodingPolicyID == null);
|
||||
Preconditions.checkArgument(replication >= 0 &&
|
||||
replication <= MAX_REDUNDANCY);
|
||||
replication <= MAX_REDUNDANCY,
|
||||
"Invalid replication value " + replication);
|
||||
layoutRedundancy |= replication;
|
||||
}
|
||||
return layoutRedundancy;
|
||||
@ -513,9 +516,8 @@ public short getPreferredBlockReplication() {
|
||||
ErasureCodingPolicy ecPolicy =
|
||||
ErasureCodingPolicyManager.getPolicyByPolicyID(
|
||||
getErasureCodingPolicyID());
|
||||
if (ecPolicy == null){
|
||||
ecPolicy = ErasureCodingPolicyManager.getSystemDefaultPolicy();
|
||||
}
|
||||
Preconditions.checkNotNull(ecPolicy, "Could not find EC policy with ID 0x"
|
||||
+ StringUtils.byteToHexString(getErasureCodingPolicyID()));
|
||||
return (short) (ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
|
||||
}
|
||||
|
||||
|
@ -1311,11 +1311,8 @@ public String toString() {
|
||||
|
||||
@VisibleForTesting
|
||||
static class ErasureCodingResult extends Result {
|
||||
final String defaultECPolicy;
|
||||
|
||||
ErasureCodingResult(Configuration conf) {
|
||||
defaultECPolicy = ErasureCodingPolicyManager.getSystemDefaultPolicy()
|
||||
.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1392,8 +1389,7 @@ public String toString() {
|
||||
((float) (numMisReplicatedBlocks * 100) / (float) totalBlocks))
|
||||
.append(" %)");
|
||||
}
|
||||
res.append("\n Default ecPolicy:\t\t").append(defaultECPolicy)
|
||||
.append("\n Average block group size:\t").append(
|
||||
res.append("\n Average block group size:\t").append(
|
||||
getReplicationFactor()).append("\n Missing block groups:\t\t").append(
|
||||
missingIds.size()).append("\n Corrupt block groups:\t\t").append(
|
||||
corruptBlocks).append("\n Missing internal blocks:\t").append(
|
||||
|
@ -54,6 +54,7 @@
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.junit.rules.Timeout;
|
||||
|
||||
/**
|
||||
@ -83,6 +84,18 @@ private static INodeFile createStripedINodeFile() {
|
||||
HdfsConstants.COLD_STORAGE_POLICY_ID, BlockType.STRIPED);
|
||||
}
|
||||
|
||||
@Rule
|
||||
public ExpectedException thrown = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testInvalidECPolicy() throws IllegalArgumentException {
|
||||
thrown.expect(IllegalArgumentException.class);
|
||||
thrown.expectMessage("Could not find EC policy with ID 0xbb");
|
||||
new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID, null, perm, 0L, 0L,
|
||||
null, null, (byte) 0xBB, 1024L,
|
||||
HdfsConstants.COLD_STORAGE_POLICY_ID, BlockType.STRIPED);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockStripedFeature()
|
||||
throws IOException, InterruptedException{
|
||||
|
Loading…
Reference in New Issue
Block a user