HDFS-11382. Persist Erasure Coding Policy ID in a new optional field in INodeFile in FSImage. Contributed by Manoj Govindassamy.

This commit is contained in:
Andrew Wang 2017-02-27 17:07:45 -08:00
parent 5f5b031d1f
commit 55c07bbed2
17 changed files with 313 additions and 72 deletions

View File

@ -79,7 +79,7 @@ public HdfsFileStatus(long length, boolean isdir, int block_replication,
byte storagePolicy, ErasureCodingPolicy ecPolicy) {
this.length = length;
this.isdir = isdir;
this.block_replication = ecPolicy == null ? (short) block_replication : 0;
this.block_replication = (short) block_replication;
this.blocksize = blocksize;
this.modification_time = modification_time;
this.access_time = access_time;

View File

@ -234,4 +234,10 @@
<Method name="assertAllResultsEqual" />
<Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
</Match>
<!-- Manually verified that signed byte value involved in bitwise OR is not negative -->
<Match>
<Class name="org.apache.hadoop.hdfs.server.namenode.INodeFile$HeaderFormat" />
<Method name="getBlockLayoutRedundancy" />
<Bug pattern="BIT_IOR_OF_SIGNED_BYTE" />
</Match>
</FindBugsFilter>

View File

@ -416,21 +416,25 @@ static INodeFile addFileForEditLog(
assert fsd.hasWriteLock();
try {
// check if the file has an EC policy
boolean isStriped = false;
ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.
getErasureCodingPolicy(fsd.getFSNamesystem(), existing);
if (ecPolicy != null) {
replication = ecPolicy.getId();
isStriped = true;
}
final BlockType blockType = ecPolicy != null?
final BlockType blockType = isStriped ?
BlockType.STRIPED : BlockType.CONTIGUOUS;
final Short replicationFactor = (!isStriped ? replication : null);
final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
if (underConstruction) {
newNode = newINodeFile(id, permissions, modificationTime,
modificationTime, replication, preferredBlockSize, storagePolicyId,
blockType);
modificationTime, replicationFactor, ecPolicyID, preferredBlockSize,
storagePolicyId, blockType);
newNode.toUnderConstruction(clientName, clientMachine);
} else {
newNode = newINodeFile(id, permissions, modificationTime, atime,
replication, preferredBlockSize, storagePolicyId, blockType);
replicationFactor, ecPolicyID, preferredBlockSize,
storagePolicyId, blockType);
}
newNode.setLocalName(localName);
INodesInPath iip = fsd.addINode(existing, newNode,
@ -523,15 +527,19 @@ private static INodesInPath addFile(
INodesInPath newiip;
fsd.writeLock();
try {
boolean isStriped = false;
ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp.
getErasureCodingPolicy(fsd.getFSNamesystem(), existing);
if (ecPolicy != null) {
replication = ecPolicy.getId();
isStriped = true;
}
final BlockType blockType = ecPolicy != null?
final BlockType blockType = isStriped ?
BlockType.STRIPED : BlockType.CONTIGUOUS;
final Short replicationFactor = (!isStriped ? replication : null);
final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
modTime, modTime, replication, preferredBlockSize, blockType);
modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize,
blockType);
newNode.setLocalName(localName);
newNode.toUnderConstruction(clientName, clientMachine);
newiip = fsd.addINode(existing, newNode, permissions.getPermission());
@ -702,17 +710,17 @@ private static boolean completeFileInternal(
private static INodeFile newINodeFile(
long id, PermissionStatus permissions, long mtime, long atime,
short replication, long preferredBlockSize, byte storagePolicyId,
BlockType blockType) {
Short replication, Byte ecPolicyID, long preferredBlockSize,
byte storagePolicyId, BlockType blockType) {
return new INodeFile(id, null, permissions, mtime, atime,
BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize,
BlockInfo.EMPTY_ARRAY, replication, ecPolicyID, preferredBlockSize,
storagePolicyId, blockType);
}
private static INodeFile newINodeFile(long id, PermissionStatus permissions,
long mtime, long atime, short replication, long preferredBlockSize,
BlockType blockType) {
return newINodeFile(id, permissions, mtime, atime, replication,
long mtime, long atime, Short replication, Byte ecPolicyID,
long preferredBlockSize, BlockType blockType) {
return newINodeFile(id, permissions, mtime, atime, replication, ecPolicyID,
preferredBlockSize, (byte)0, blockType);
}

View File

@ -896,9 +896,9 @@ public INodeFileAttributes loadINodeFileAttributes(DataInput in)
in.readShort());
final long preferredBlockSize = in.readLong();
return new INodeFileAttributes.SnapshotCopy(name, permissions, null, modificationTime,
accessTime, replication, preferredBlockSize, (byte) 0, null,
BlockType.CONTIGUOUS);
return new INodeFileAttributes.SnapshotCopy(name, permissions, null,
modificationTime, accessTime, replication, null, preferredBlockSize,
(byte) 0, null, BlockType.CONTIGUOUS);
}
public INodeDirectoryAttributes loadINodeDirectoryAttributes(DataInput in)

View File

@ -329,17 +329,19 @@ private INodeFile loadINodeFile(INodeSection.INode n) {
assert n.getType() == INodeSection.INode.Type.FILE;
INodeSection.INodeFile f = n.getFile();
List<BlockProto> bp = f.getBlocksList();
short replication = (short) f.getReplication();
BlockType blockType = PBHelperClient.convert(f.getBlockType());
LoaderContext state = parent.getLoaderContext();
ErasureCodingPolicy ecPolicy = (blockType == BlockType.STRIPED) ?
ErasureCodingPolicyManager.getPolicyByPolicyID((byte) replication) :
null;
boolean isStriped = f.hasErasureCodingPolicyID();
Short replication = (!isStriped ? (short) f.getReplication() : null);
ErasureCodingPolicy ecPolicy = isStriped ?
ErasureCodingPolicyManager.getPolicyByPolicyID(
(byte) f.getErasureCodingPolicyID()) : null;
Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
BlockInfo[] blocks = new BlockInfo[bp.size()];
for (int i = 0; i < bp.size(); ++i) {
BlockProto b = bp.get(i);
if (blockType == BlockType.STRIPED) {
if (isStriped) {
blocks[i] = new BlockInfoStriped(PBHelperClient.convert(b), ecPolicy);
} else {
blocks[i] = new BlockInfoContiguous(PBHelperClient.convert(b),
@ -352,8 +354,8 @@ private INodeFile loadINodeFile(INodeSection.INode n) {
final INodeFile file = new INodeFile(n.getId(),
n.getName().toByteArray(), permissions, f.getModificationTime(),
f.getAccessTime(), blocks, replication, f.getPreferredBlockSize(),
(byte)f.getStoragePolicyID(), blockType);
f.getAccessTime(), blocks, replication, ecPolicyID,
f.getPreferredBlockSize(), (byte)f.getStoragePolicyID(), blockType);
if (f.hasAcl()) {
int[] entries = AclEntryStatusFormat.toInt(loadAclEntries(
@ -376,7 +378,7 @@ private INodeFile loadINodeFile(INodeSection.INode n) {
BlockInfo lastBlk = file.getLastBlock();
// replace the last block of file
final BlockInfo ucBlk;
if (blockType == BlockType.STRIPED) {
if (isStriped) {
BlockInfoStriped striped = (BlockInfoStriped) lastBlk;
ucBlk = new BlockInfoStriped(striped, ecPolicy);
} else {
@ -503,10 +505,15 @@ public static INodeSection.INodeFile.Builder buildINodeFile(
.setModificationTime(file.getModificationTime())
.setPermission(buildPermissionStatus(file, state.getStringMap()))
.setPreferredBlockSize(file.getPreferredBlockSize())
.setReplication(file.getFileReplication())
.setStoragePolicyID(file.getLocalStoragePolicyID())
.setBlockType(PBHelperClient.convert(file.getBlockType()));
if (file.isStriped()) {
b.setErasureCodingPolicyID(file.getErasureCodingPolicyID());
} else {
b.setReplication(file.getFileReplication());
}
AclFeature f = file.getAclFeature();
if (f != null) {
b.setAcl(buildAclEntries(f, state.getStringMap()));

View File

@ -62,6 +62,11 @@
public class INodeFile extends INodeWithAdditionalFields
implements INodeFileAttributes, BlockCollection {
/**
* Erasure Coded striped blocks have replication factor of 1.
*/
public static final short DEFAULT_REPL_FOR_STRIPED_BLOCKS = 1;
/** The same as valueOf(inode, path, false). */
public static INodeFile valueOf(INode inode, String path
) throws FileNotFoundException {
@ -126,7 +131,6 @@ enum HeaderFormat {
* Different types can be replica or EC
*/
private static final int LAYOUT_BIT_WIDTH = 1;
private static final int MAX_REDUNDANCY = (1 << 11) - 1;
HeaderFormat(LongBitFormat previous, int length, long min) {
@ -134,8 +138,13 @@ enum HeaderFormat {
}
static short getReplication(long header) {
long layoutRedundancy = BLOCK_LAYOUT_AND_REDUNDANCY.BITS.retrieve(header);
return (short) (layoutRedundancy & MAX_REDUNDANCY);
if (isStriped(header)) {
return DEFAULT_REPL_FOR_STRIPED_BLOCKS;
} else {
long layoutRedundancy =
BLOCK_LAYOUT_AND_REDUNDANCY.BITS.retrieve(header);
return (short) (layoutRedundancy & MAX_REDUNDANCY);
}
}
static byte getECPolicyID(long header) {
@ -158,8 +167,7 @@ static byte getStoragePolicyID(long header) {
static final long BLOCK_TYPE_MASK_STRIPED = 1 << 11;
static boolean isStriped(long header) {
long layoutRedundancy = BLOCK_LAYOUT_AND_REDUNDANCY.BITS.retrieve(header);
return (layoutRedundancy & BLOCK_TYPE_MASK) != 0;
return getBlockType(header) == STRIPED;
}
static BlockType getBlockType(long header) {
@ -172,22 +180,40 @@ static BlockType getBlockType(long header) {
}
}
static long toLong(long preferredBlockSize, short replication,
BlockType blockType, byte storagePolicyID) {
Preconditions.checkArgument(replication >= 0 &&
replication <= MAX_REDUNDANCY);
/**
* Construct block layout redundancy based on the given BlockType,
* replication factor and EC PolicyID.
*/
static long getBlockLayoutRedundancy(final BlockType blockType,
final Short replication, final Byte erasureCodingPolicyID) {
long layoutRedundancy = 0;
if (blockType == STRIPED) {
Preconditions.checkArgument(replication == null &&
erasureCodingPolicyID != null);
Preconditions.checkArgument(
ErasureCodingPolicyManager.getPolicyByPolicyID(
erasureCodingPolicyID) != null);
layoutRedundancy |= BLOCK_TYPE_MASK_STRIPED;
// Following bitwise OR with signed byte erasureCodingPolicyID is safe
// as the PolicyID can never be in negative.
layoutRedundancy |= erasureCodingPolicyID;
} else {
Preconditions.checkArgument(replication != null &&
erasureCodingPolicyID == null);
Preconditions.checkArgument(replication >= 0 &&
replication <= MAX_REDUNDANCY);
layoutRedundancy |= replication;
}
return layoutRedundancy;
}
static long toLong(long preferredBlockSize, long layoutRedundancy,
byte storagePolicyID) {
long h = 0;
if (preferredBlockSize == 0) {
preferredBlockSize = PREFERRED_BLOCK_SIZE.BITS.getMin();
}
h = PREFERRED_BLOCK_SIZE.BITS.combine(preferredBlockSize, h);
// For erasure coded files, replication is used to store ec policy id
// TODO: this is hacky. Add some utility to generate the layoutRedundancy
long layoutRedundancy = 0;
if (blockType == STRIPED) {
layoutRedundancy |= BLOCK_TYPE_MASK_STRIPED;
}
layoutRedundancy |= replication;
h = BLOCK_LAYOUT_AND_REDUNDANCY.BITS.combine(layoutRedundancy, h);
h = STORAGE_POLICY_ID.BITS.combine(storagePolicyID, h);
return h;
@ -202,15 +228,17 @@ static long toLong(long preferredBlockSize, short replication,
INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime,
long atime, BlockInfo[] blklist, short replication,
long preferredBlockSize) {
this(id, name, permissions, mtime, atime, blklist, replication,
this(id, name, permissions, mtime, atime, blklist, replication, null,
preferredBlockSize, (byte) 0, CONTIGUOUS);
}
INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime,
long atime, BlockInfo[] blklist, short replication,
long atime, BlockInfo[] blklist, Short replication, Byte ecPolicyID,
long preferredBlockSize, byte storagePolicyID, BlockType blockType) {
super(id, name, permissions, mtime, atime);
header = HeaderFormat.toLong(preferredBlockSize, replication, blockType,
final long layoutRedundancy = HeaderFormat.getBlockLayoutRedundancy(
blockType, replication, ecPolicyID);
header = HeaderFormat.toLong(preferredBlockSize, layoutRedundancy,
storagePolicyID);
if (blklist != null && blklist.length > 0) {
for (BlockInfo b : blklist) {
@ -462,6 +490,9 @@ public final short getFileReplication(int snapshot) {
* */
@Override // INodeFileAttributes
public final short getFileReplication() {
if (isStriped()) {
return DEFAULT_REPL_FOR_STRIPED_BLOCKS;
}
return getFileReplication(CURRENT_STATE_ID);
}

View File

@ -56,11 +56,13 @@ static class SnapshotCopy extends INodeAttributes.SnapshotCopy
public SnapshotCopy(byte[] name, PermissionStatus permissions,
AclFeature aclFeature, long modificationTime, long accessTime,
short replication, long preferredBlockSize,
Short replication, Byte ecPolicyID, long preferredBlockSize,
byte storagePolicyID, XAttrFeature xAttrsFeature, BlockType blockType) {
super(name, permissions, aclFeature, modificationTime, accessTime,
xAttrsFeature);
header = HeaderFormat.toLong(preferredBlockSize, replication, blockType,
final long layoutRedundancy = HeaderFormat.getBlockLayoutRedundancy(
blockType, replication, ecPolicyID);
header = HeaderFormat.toLong(preferredBlockSize, layoutRedundancy,
storagePolicyID);
}

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTypeProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
@ -232,9 +233,15 @@ private void loadFileDiffList(InputStream in, INodeFile file, int size)
fileInPb.getXAttrs(), state.getStringTable()));
}
boolean isStriped =
(fileInPb.getBlockType() == BlockTypeProto .STRIPED);
Short replication =
(!isStriped ? (short)fileInPb.getReplication() : null);
Byte ecPolicyID =
(isStriped ? (byte)fileInPb.getErasureCodingPolicyID() : null);
copy = new INodeFileAttributes.SnapshotCopy(pbf.getName()
.toByteArray(), permission, acl, fileInPb.getModificationTime(),
fileInPb.getAccessTime(), (short) fileInPb.getReplication(),
fileInPb.getAccessTime(), replication, ecPolicyID,
fileInPb.getPreferredBlockSize(),
(byte)fileInPb.getStoragePolicyID(), xAttrs,
PBHelperClient.convert(fileInPb.getBlockType()));

View File

@ -49,6 +49,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSImageUtil;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.web.JsonUtil;
import org.apache.hadoop.hdfs.web.resources.XAttrEncodingParam;
@ -589,7 +590,11 @@ private long lookup(String path) throws IOException {
map.put("pathSuffix",
printSuffix ? inode.getName().toStringUtf8() : "");
map.put("permission", toString(p.getPermission()));
map.put("replication", f.getReplication());
if (f.hasErasureCodingPolicyID()) {
map.put("replication", INodeFile.DEFAULT_REPL_FOR_STRIPED_BLOCKS);
} else {
map.put("replication", f.getReplication());
}
map.put("type", inode.getType());
map.put("fileId", inode.getId());
map.put("childrenNum", 0);

View File

@ -496,9 +496,7 @@ private void dumpINodeFile(INodeSection.INodeFile f) {
o(INODE_SECTION_STORAGE_POLICY_ID, f.getStoragePolicyID());
}
if (f.getBlockType() != BlockTypeProto.CONTIGUOUS) {
out.print("<" + INODE_SECTION_BLOCK_TYPE + ">");
o(SECTION_NAME, f.getBlockType().name());
out.print("</" + INODE_SECTION_BLOCK_TYPE + ">\n");
o(INODE_SECTION_BLOCK_TYPE, f.getBlockType().name());
}
if (f.hasFileUC()) {

View File

@ -141,6 +141,7 @@ message INodeSection {
optional XAttrFeatureProto xAttrs = 9;
optional uint32 storagePolicyID = 10;
optional BlockTypeProto blockType = 11;
optional uint32 erasureCodingPolicyID = 12;
}
message QuotaByStorageTypeEntryProto {

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.junit.After;
import org.junit.Before;
@ -343,7 +344,9 @@ public void testMultiplePoliciesCoExist() throws Exception {
assertEquals(policy, fs.getErasureCodingPolicy(file));
assertEquals(policy, fs.getErasureCodingPolicy(dir));
INode iNode = namesystem.getFSDirectory().getINode(file.toString());
assertEquals(policy.getId(), iNode.asFile().getFileReplication());
assertEquals(policy.getId(), iNode.asFile().getErasureCodingPolicyID());
assertEquals(INodeFile.DEFAULT_REPL_FOR_STRIPED_BLOCKS,
iNode.asFile().getFileReplication());
}
}
}

View File

@ -155,7 +155,7 @@ private void testPersistHelper(Configuration conf) throws IOException {
private void testSaveAndLoadStripedINodeFile(FSNamesystem fsn, Configuration conf,
boolean isUC) throws IOException{
// contruct a INode with StripedBlock for saving and loading
// Construct an INode with StripedBlock for saving and loading
fsn.setErasureCodingPolicy("/", testECPolicy, false);
long id = 123456789;
byte[] name = "testSaveAndLoadInodeFile_testfile".getBytes();
@ -163,29 +163,30 @@ private void testSaveAndLoadStripedINodeFile(FSNamesystem fsn, Configuration con
"testuser_groups", new FsPermission((short)0x755));
long mtime = 1426222916-3600;
long atime = 1426222916;
BlockInfoContiguous[] blks = new BlockInfoContiguous[0];
short replication = testECPolicy.getId();
BlockInfoContiguous[] blocks = new BlockInfoContiguous[0];
byte erasureCodingPolicyID = testECPolicy.getId();
long preferredBlockSize = 128*1024*1024;
INodeFile file = new INodeFile(id, name, permissionStatus, mtime, atime,
blks, replication, preferredBlockSize, (byte) 0, BlockType.STRIPED);
blocks, null, erasureCodingPolicyID, preferredBlockSize,
(byte) 0, BlockType.STRIPED);
ByteArrayOutputStream bs = new ByteArrayOutputStream();
//construct StripedBlocks for the INode
BlockInfoStriped[] stripedBlks = new BlockInfoStriped[3];
// Construct StripedBlocks for the INode
BlockInfoStriped[] stripedBlocks = new BlockInfoStriped[3];
long stripedBlkId = 10000001;
long timestamp = mtime+3600;
for (int i = 0; i < stripedBlks.length; i++) {
stripedBlks[i] = new BlockInfoStriped(
for (int i = 0; i < stripedBlocks.length; i++) {
stripedBlocks[i] = new BlockInfoStriped(
new Block(stripedBlkId + i, preferredBlockSize, timestamp),
testECPolicy);
file.addBlock(stripedBlks[i]);
file.addBlock(stripedBlocks[i]);
}
final String client = "testClient";
final String clientMachine = "testClientMachine";
final String path = "testUnderConstructionPath";
//save the INode to byte array
// Save the INode to byte array
DataOutput out = new DataOutputStream(bs);
if (isUC) {
file.toUnderConstruction(client, clientMachine);
@ -495,6 +496,7 @@ public void testSupportBlockGroup() throws Exception {
FSNamesystem fsn = cluster.getNamesystem();
INodeFile inode = fsn.dir.getINode(file_10_4.toString()).asFile();
assertTrue(inode.isStriped());
assertEquals(testECPolicy.getId(), inode.getErasureCodingPolicyID());
BlockInfo[] blks = inode.getBlocks();
assertEquals(1, blks.length);
assertTrue(blks[0].isStriped());
@ -513,6 +515,9 @@ public void testSupportBlockGroup() throws Exception {
// check the information of file_3_2
inode = fsn.dir.getINode(file_3_2.toString()).asFile();
assertTrue(inode.isStriped());
assertEquals(ErasureCodingPolicyManager.getPolicyByPolicyID(
HdfsConstants.RS_3_2_POLICY_ID).getId(),
inode.getErasureCodingPolicyID());
blks = inode.getBlocks();
assertEquals(1, blks.length);
assertTrue(blks[0].isStriped());

View File

@ -70,6 +70,7 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@ -104,15 +105,15 @@ INodeFile createINodeFile(short replication, long preferredBlockSize) {
null, replication, preferredBlockSize);
}
INodeFile createStripedINodeFile(short replication, long preferredBlockSize) {
INodeFile createStripedINodeFile(long preferredBlockSize) {
return new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID, null, perm, 0L, 0L,
null, replication, preferredBlockSize,
null, null, HdfsConstants.RS_6_3_POLICY_ID, preferredBlockSize,
HdfsConstants.WARM_STORAGE_POLICY_ID, STRIPED);
}
private static INodeFile createINodeFile(byte storagePolicyID) {
return new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID, null, perm, 0L, 0L,
null, (short)3, 1024L, storagePolicyID, CONTIGUOUS);
null, (short)3, null, 1024L, storagePolicyID, CONTIGUOUS);
}
@Test
@ -133,6 +134,61 @@ public void testStoragePolicyIdAboveUpperBound () throws IllegalArgumentExceptio
createINodeFile((byte)16);
}
@Test
public void testContiguousLayoutRedundancy() {
INodeFile inodeFile;
try {
new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID,
null, perm, 0L, 0L, null, new Short((short) 3) /*replication*/,
HdfsConstants.RS_6_3_POLICY_ID /*ec policy*/,
preferredBlockSize, HdfsConstants.WARM_STORAGE_POLICY_ID, CONTIGUOUS);
fail("INodeFile construction should fail when both replication and " +
"ECPolicy requested!");
} catch (IllegalArgumentException iae) {
LOG.info("Expected exception: ", iae);
}
try {
new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID,
null, perm, 0L, 0L, null, null /*replication*/, null /*ec policy*/,
preferredBlockSize, HdfsConstants.WARM_STORAGE_POLICY_ID, CONTIGUOUS);
fail("INodeFile construction should fail when replication param not " +
"provided for contiguous layout!");
} catch (IllegalArgumentException iae) {
LOG.info("Expected exception: ", iae);
}
try {
new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID,
null, perm, 0L, 0L, null, Short.MAX_VALUE /*replication*/,
null /*ec policy*/, preferredBlockSize,
HdfsConstants.WARM_STORAGE_POLICY_ID, CONTIGUOUS);
fail("INodeFile construction should fail when replication param is " +
"beyond the range supported!");
} catch (IllegalArgumentException iae) {
LOG.info("Expected exception: ", iae);
}
final Short replication = new Short((short) 3);
try {
new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID,
null, perm, 0L, 0L, null, replication, null /*ec policy*/,
preferredBlockSize, HdfsConstants.WARM_STORAGE_POLICY_ID, STRIPED);
fail("INodeFile construction should fail when replication param is " +
"provided for striped layout!");
} catch (IllegalArgumentException iae) {
LOG.info("Expected exception: ", iae);
}
inodeFile = new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID,
null, perm, 0L, 0L, null, replication, null /*ec policy*/,
preferredBlockSize, HdfsConstants.WARM_STORAGE_POLICY_ID, CONTIGUOUS);
Assert.assertTrue(!inodeFile.isStriped());
Assert.assertEquals(replication.shortValue(),
inodeFile.getFileReplication());
}
/**
* Test for the Replication value. Sets a value and checks if it was set
* correct.
@ -237,7 +293,7 @@ public void testGetBlockType() {
preferredBlockSize = 128*1024*1024;
INodeFile inf = createINodeFile(replication, preferredBlockSize);
assertEquals(inf.getBlockType(), CONTIGUOUS);
INodeFile striped = createStripedINodeFile(replication, preferredBlockSize);
INodeFile striped = createStripedINodeFile(preferredBlockSize);
assertEquals(striped.getBlockType(), STRIPED);
}

View File

@ -17,9 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.protocol.BlockType.CONTIGUOUS;
import static org.apache.hadoop.hdfs.protocol.BlockType.STRIPED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.io.IOException;
@ -76,8 +79,8 @@ public class TestStripedINodeFile {
private static INodeFile createStripedINodeFile() {
return new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID, null, perm, 0L, 0L,
null, (short)0, 1024L, HdfsConstants.COLD_STORAGE_POLICY_ID,
BlockType.STRIPED);
null, null, HdfsConstants.RS_6_3_POLICY_ID, 1024L,
HdfsConstants.COLD_STORAGE_POLICY_ID, BlockType.STRIPED);
}
@Test
@ -95,6 +98,61 @@ public void testBlockStripedTotalBlockCount() {
assertEquals(9, blockInfoStriped.getTotalBlockNum());
}
@Test
public void testStripedLayoutRedundancy() {
INodeFile inodeFile;
try {
new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID,
null, perm, 0L, 0L, null, new Short((short) 3) /*replication*/,
HdfsConstants.RS_6_3_POLICY_ID /*ec policy*/,
1024L, HdfsConstants.WARM_STORAGE_POLICY_ID, STRIPED);
fail("INodeFile construction should fail when both replication and " +
"ECPolicy requested!");
} catch (IllegalArgumentException iae) {
LOG.info("Expected exception: ", iae);
}
try {
new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID,
null, perm, 0L, 0L, null, null /*replication*/, null /*ec policy*/,
1024L, HdfsConstants.WARM_STORAGE_POLICY_ID, STRIPED);
fail("INodeFile construction should fail when EC Policy param not " +
"provided for striped layout!");
} catch (IllegalArgumentException iae) {
LOG.info("Expected exception: ", iae);
}
try {
new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID,
null, perm, 0L, 0L, null, null /*replication*/,
Byte.MAX_VALUE /*ec policy*/, 1024L,
HdfsConstants.WARM_STORAGE_POLICY_ID, STRIPED);
fail("INodeFile construction should fail when EC Policy is " +
"not in the supported list!");
} catch (IllegalArgumentException iae) {
LOG.info("Expected exception: ", iae);
}
final Byte ecPolicyID = HdfsConstants.RS_6_3_POLICY_ID;
try {
new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID,
null, perm, 0L, 0L, null, null /*replication*/, ecPolicyID,
1024L, HdfsConstants.WARM_STORAGE_POLICY_ID, CONTIGUOUS);
fail("INodeFile construction should fail when replication param is " +
"provided for striped layout!");
} catch (IllegalArgumentException iae) {
LOG.info("Expected exception: ", iae);
}
inodeFile = new INodeFile(HdfsConstants.GRANDFATHER_INODE_ID,
null, perm, 0L, 0L, null, null /*replication*/, ecPolicyID,
1024L, HdfsConstants.WARM_STORAGE_POLICY_ID, STRIPED);
Assert.assertTrue(inodeFile.isStriped());
Assert.assertEquals(ecPolicyID.byteValue(),
inodeFile.getErasureCodingPolicyID());
}
@Test
public void testBlockStripedLength()
throws IOException, InterruptedException {

View File

@ -48,6 +48,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -75,7 +76,10 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
@ -103,6 +107,7 @@ public class TestOfflineImageViewer {
private static final int FILES_PER_DIR = 4;
private static final String TEST_RENEWER = "JobTracker";
private static File originalFsimage = null;
private static int filesECCount = 0;
// namespace as written to dfs, to be compared with viewer's output
final static HashMap<String, FileStatus> writtenFiles = Maps.newHashMap();
@ -129,7 +134,7 @@ public static void createOriginalFSImage() throws IOException {
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL,
"RULE:[2:$1@$0](JobTracker@.*FOO.COM)s/@.*//" + "DEFAULT");
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
DistributedFileSystem hdfs = cluster.getFileSystem();
@ -224,9 +229,38 @@ public static void createOriginalFSImage() throws IOException {
aclEntry(ACCESS, GROUP, "bar", READ_EXECUTE),
aclEntry(ACCESS, OTHER, EXECUTE)));
// Create an Erasure Coded dir
Path ecDir = new Path("/ec");
hdfs.mkdirs(ecDir);
dirCount++;
ErasureCodingPolicy ecPolicy =
ErasureCodingPolicyManager.getPolicyByPolicyID(
HdfsConstants.XOR_2_1_POLICY_ID);
hdfs.getClient().setErasureCodingPolicy(ecDir.toString(), ecPolicy);
writtenFiles.put(ecDir.toString(), hdfs.getFileStatus(ecDir));
// Create an empty Erasure Coded file
Path emptyECFile = new Path(ecDir, "EmptyECFile.txt");
hdfs.create(emptyECFile).close();
writtenFiles.put(emptyECFile.toString(),
pathToFileEntry(hdfs, emptyECFile.toString()));
filesECCount++;
// Create a small Erasure Coded file
Path smallECFile = new Path(ecDir, "SmallECFile.txt");
FSDataOutputStream out = hdfs.create(smallECFile);
Random r = new Random();
byte[] bytes = new byte[1024 * 10];
r.nextBytes(bytes);
out.write(bytes);
writtenFiles.put(smallECFile.toString(),
pathToFileEntry(hdfs, smallECFile.toString()));
filesECCount++;
// Write results to the fsimage file
hdfs.setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
hdfs.saveNamespace();
hdfs.setSafeMode(SafeModeAction.SAFEMODE_LEAVE, false);
// Determine location of fsimage file
originalFsimage = FSImageTestUtil.findLatestImageFile(FSImageTestUtil
@ -292,7 +326,7 @@ public void testFileDistributionCalculator() throws IOException {
Matcher matcher = p.matcher(outputString);
assertTrue(matcher.find() && matcher.groupCount() == 1);
int totalFiles = Integer.parseInt(matcher.group(1));
assertEquals(NUM_DIRS * FILES_PER_DIR + 1, totalFiles);
assertEquals(NUM_DIRS * FILES_PER_DIR + filesECCount + 1, totalFiles);
p = Pattern.compile("totalDirectories = (\\d+)\n");
matcher = p.matcher(outputString);
@ -375,6 +409,24 @@ public void testWebImageViewer() throws Exception {
url = new URL("http://localhost:" + port + "/foo");
verifyHttpResponseCode(HttpURLConnection.HTTP_NOT_FOUND, url);
// Verify the Erasure Coded empty file status
Path emptyECFilePath = new Path("/ec/EmptyECFile.txt");
FileStatus actualEmptyECFileStatus =
webhdfs.getFileStatus(new Path(emptyECFilePath.toString()));
FileStatus expectedEmptyECFileStatus = writtenFiles.get(
emptyECFilePath.toString());
System.out.println(webhdfs.getFileStatus(new Path(emptyECFilePath
.toString())));
compareFile(expectedEmptyECFileStatus, actualEmptyECFileStatus);
// Verify the Erasure Coded small file status
Path smallECFilePath = new Path("/ec/SmallECFile.txt");
FileStatus actualSmallECFileStatus =
webhdfs.getFileStatus(new Path(smallECFilePath.toString()));
FileStatus expectedSmallECFileStatus = writtenFiles.get(
smallECFilePath.toString());
compareFile(expectedSmallECFileStatus, actualSmallECFileStatus);
// GETFILESTATUS operation
status = webhdfs.getFileStatus(new Path("/dir0/file0"));
compareFile(expected, status);

View File

@ -143,6 +143,8 @@ private void testFileSize(int numBytes) throws IOException,
// Verify space consumed present in BlockInfoStriped
FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
INodeFile fileNode = fsdir.getINode4Write(file.toString()).asFile();
assertEquals(ErasureCodingPolicyManager.getSystemDefaultPolicy().getId(),
fileNode.getErasureCodingPolicyID());
assertTrue("Invalid block size", fileNode.getBlocks().length > 0);
long actualFileSize = 0;
for (BlockInfo blockInfo : fileNode.getBlocks()) {