diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java index cef83185b1..30b5ee715f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java @@ -20,6 +20,8 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; +import java.io.DataOutput; +import java.io.IOException; /** * Subclass of {@link BlockInfo}, presenting a block group in erasure coding. @@ -206,6 +208,13 @@ public int numNodes() { return num; } + @Override + public void write(DataOutput out) throws IOException { + out.writeShort(dataBlockNum); + out.writeShort(parityBlockNum); + super.write(out); + } + /** * Convert a complete block to an under construction block. * @return BlockInfoUnderConstruction - an under construction block. @@ -215,7 +224,7 @@ public BlockInfoStripedUnderConstruction convertToBlockUnderConstruction( final BlockInfoStripedUnderConstruction ucBlock; if(isComplete()) { ucBlock = new BlockInfoStripedUnderConstruction(this, getDataBlockNum(), - getParityBlockNum(), s, targets); + getParityBlockNum(), s, targets); ucBlock.setBlockCollection(getBlockCollection()); } else { // the block is already under construction diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java index aef0b2811b..0582a0a361 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java @@ -48,13 +48,16 @@ import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LayoutFlags; import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; @@ -755,16 +758,31 @@ INode loadINode(final byte[] localName, boolean isSnapshotINode, atime = in.readLong(); } final long blockSize = in.readLong(); + final boolean isStriped = NameNodeLayoutVersion.supports( + NameNodeLayoutVersion.Feature.ERASURE_CODING, imgVersion) + && (in.readBoolean()); final int numBlocks = in.readInt(); if (numBlocks >= 0) { // file // read blocks - BlockInfoContiguous[] blocks = new BlockInfoContiguous[numBlocks]; - for (int j = 0; j < numBlocks; j++) { - blocks[j] = new BlockInfoContiguous(replication); - blocks[j].readFields(in); + Block[] blocks; + if (isStriped) { + blocks = new Block[numBlocks]; + for (int j = 0; j < numBlocks; j++) { + short dataBlockNum = in.readShort(); + short parityBlockNum = in.readShort(); + blocks[j] = new BlockInfoStriped(new Block(), + dataBlockNum, parityBlockNum); + blocks[j].readFields(in); + } + } else { + blocks = new BlockInfoContiguous[numBlocks]; + for (int j = 0; j < numBlocks; j++) { + blocks[j] = new BlockInfoContiguous(replication); + blocks[j].readFields(in); + } } String clientName = ""; @@ -783,9 +801,18 @@ INode loadINode(final byte[] localName, boolean isSnapshotINode, clientMachine = FSImageSerialization.readString(in); // convert the last block to BlockUC if (blocks.length > 0) { - BlockInfoContiguous lastBlk = blocks[blocks.length - 1]; - blocks[blocks.length - 1] = new BlockInfoContiguousUnderConstruction( - lastBlk, replication); + Block lastBlk = blocks[blocks.length - 1]; + if (isStriped){ + BlockInfoStriped lastStripedBlk = (BlockInfoStriped) lastBlk; + blocks[blocks.length - 1] + = new BlockInfoStripedUnderConstruction(lastBlk, + lastStripedBlk.getDataBlockNum(), + lastStripedBlk.getParityBlockNum()); + } else { + blocks[blocks.length - 1] + = new BlockInfoContiguousUnderConstruction(lastBlk, + replication); + } } } } @@ -798,14 +825,25 @@ INode loadINode(final byte[] localName, boolean isSnapshotINode, counter.increment(); } - final INodeFile file = new INodeFile(inodeId, localName, permissions, - modificationTime, atime, blocks, replication, blockSize); + INodeFile file; + if (isStriped) { + file = new INodeFile(inodeId, localName, permissions, modificationTime, + atime, new BlockInfoContiguous[0], (short) 0, blockSize); + file.addStripedBlocksFeature(); + for (Block block : blocks) { + file.getStripedBlocksFeature().addBlock((BlockInfoStriped) block); + } + } else { + file = new INodeFile(inodeId, localName, permissions, + modificationTime, atime, (BlockInfoContiguous[]) blocks, + replication, blockSize); + } if (underConstruction) { file.toUnderConstruction(clientName, clientMachine); } - return fileDiffs == null ? file : new INodeFile(file, fileDiffs); - } else if (numBlocks == -1) { - //directory + return fileDiffs == null ? file : new INodeFile(file, fileDiffs); + } else if (numBlocks == -1) { + //directory //read quotas final long nsQuota = in.readLong(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java index 1888d878cb..1e58858cdf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotFSImageFormat.ReferenceMap; @@ -124,21 +126,48 @@ static INodeFile readINodeUnderConstruction( short blockReplication = in.readShort(); long modificationTime = in.readLong(); long preferredBlockSize = in.readLong(); + final boolean isStriped = NameNodeLayoutVersion.supports( + NameNodeLayoutVersion.Feature.ERASURE_CODING, imgVersion) + && (in.readBoolean()); int numBlocks = in.readInt(); - BlockInfoContiguous[] blocks = new BlockInfoContiguous[numBlocks]; - Block blk = new Block(); - int i = 0; - for (; i < numBlocks-1; i++) { - blk.readFields(in); - blocks[i] = new BlockInfoContiguous(blk, blockReplication); - } - // last block is UNDER_CONSTRUCTION - if(numBlocks > 0) { - blk.readFields(in); - blocks[i] = new BlockInfoContiguousUnderConstruction( - blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null); + + final BlockInfoContiguous[] blocksContiguous; + BlockInfoStriped[] blocksStriped = null; + if (isStriped) { + blocksContiguous = new BlockInfoContiguous[0]; + blocksStriped = new BlockInfoStriped[numBlocks]; + int i = 0; + for (; i < numBlocks - 1; i++) { + short dataBlockNum = in.readShort(); + short parityBlockNum = in.readShort(); + blocksStriped[i] = new BlockInfoStriped(new Block(), dataBlockNum, + parityBlockNum); + blocksStriped[i].readFields(in); + } + if (numBlocks > 0) { + short dataBlockNum = in.readShort(); + short parityBlockNum = in.readShort(); + blocksStriped[i] = new BlockInfoStripedUnderConstruction(new Block(), + dataBlockNum, parityBlockNum, BlockUCState.UNDER_CONSTRUCTION, null); + blocksStriped[i].readFields(in); + } + } else { + blocksContiguous = new BlockInfoContiguous[numBlocks]; + Block blk = new Block(); + int i = 0; + for (; i < numBlocks-1; i++) { + blk.readFields(in); + blocksContiguous[i] = new BlockInfoContiguous(blk, blockReplication); + } + // last block is UNDER_CONSTRUCTION + if(numBlocks > 0) { + blk.readFields(in); + blocksContiguous[i] = new BlockInfoContiguousUnderConstruction( + blk, blockReplication, BlockUCState.UNDER_CONSTRUCTION, null); + } } + PermissionStatus perm = PermissionStatus.read(in); String clientName = readString(in); String clientMachine = readString(in); @@ -150,8 +179,19 @@ static INodeFile readINodeUnderConstruction( // Images in the pre-protobuf format will not have the lazyPersist flag, // so it is safe to pass false always. - INodeFile file = new INodeFile(inodeId, name, perm, modificationTime, - modificationTime, blocks, blockReplication, preferredBlockSize); + INodeFile file; + if (isStriped) { + file = new INodeFile(inodeId, name, perm, modificationTime, + modificationTime, blocksContiguous, (short) 0, preferredBlockSize); + file.addStripedBlocksFeature(); + for (int i = 0; i < numBlocks; i++) { + file.getStripedBlocksFeature().addBlock(blocksStriped[i]); + } + } else { + file = new INodeFile(inodeId, name, perm, modificationTime, + modificationTime, blocksContiguous, blockReplication, + preferredBlockSize); + } file.toUnderConstruction(clientName, clientMachine); return file; } @@ -166,7 +206,8 @@ static void writeINodeUnderConstruction(DataOutputStream out, INodeFile cons, out.writeShort(cons.getFileReplication()); out.writeLong(cons.getModificationTime()); out.writeLong(cons.getPreferredBlockSize()); - + // whether the file has striped blocks + out.writeBoolean(cons.isWithStripedBlocks()); writeBlocks(cons.getBlocks(), out); cons.getPermissionStatus().write(out); @@ -179,9 +220,9 @@ static void writeINodeUnderConstruction(DataOutputStream out, INodeFile cons, /** * Serialize a {@link INodeFile} node - * @param node The node to write + * @param file The node to write * @param out The {@link DataOutputStream} where the fields are written - * @param writeBlock Whether to write block information + * @param writeUnderConstruction Whether to write block information */ public static void writeINodeFile(INodeFile file, DataOutput out, boolean writeUnderConstruction) throws IOException { @@ -191,7 +232,8 @@ public static void writeINodeFile(INodeFile file, DataOutput out, out.writeLong(file.getModificationTime()); out.writeLong(file.getAccessTime()); out.writeLong(file.getPreferredBlockSize()); - + // whether the file has striped blocks + out.writeBoolean(file.isWithStripedBlocks()); writeBlocks(file.getBlocks(), out); SnapshotFSImageFormat.saveFileDiffList(file, out); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java index 74ddac08a2..c4db5d4d37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java @@ -25,8 +25,16 @@ import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; + import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS; import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; /** * Test {@link BlockInfoStriped} @@ -216,4 +224,30 @@ public void testReplaceBlock() { Assert.assertNull(newBlockInfo.getNext()); } } + + @Test + public void testWrite() { + long blkID = 1; + long numBytes = 1; + long generationStamp = 1; + short dataBlockNum = 6; + short parityBlockNum = 3; + ByteBuffer byteBuffer = ByteBuffer.allocate(Long.SIZE/Byte.SIZE*3 + + Short.SIZE/Byte.SIZE*2); + byteBuffer.putShort(dataBlockNum).putShort(parityBlockNum) + .putLong(blkID).putLong(numBytes).putLong(generationStamp); + + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(byteStream); + BlockInfoStriped blk = new BlockInfoStriped(new Block(1,1,1), + (short)6,(short)3); + try { + blk.write(out); + } catch(Exception ex) { + fail("testWrite error:" + ex.getMessage()); + } + assertEquals(byteBuffer.array().length, byteStream.toByteArray().length); + assertArrayEquals(byteBuffer.array(), byteStream.toByteArray()); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java index 25c0bcfc04..21df6a993b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSImage.java @@ -17,18 +17,28 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.EnumSet; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.junit.Assert; +import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -42,8 +52,8 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.test.GenericTestUtils; @@ -120,6 +130,140 @@ private void testPersistHelper(Configuration conf) throws IOException { } } + private void testSaveAndLoadINodeFile(FSNamesystem fsn, Configuration conf, + boolean isUC) throws IOException{ + // contruct a INode with StripedBlock for saving and loading + long id = 123456789; + byte[] name = "testSaveAndLoadInodeFile_testfile".getBytes(); + PermissionStatus permissionStatus = new PermissionStatus("testuser_a", + "testuser_groups", new FsPermission((short)0x755)); + long mtime = 1426222916-3600; + long atime = 1426222916; + BlockInfoContiguous[] blks = new BlockInfoContiguous[0]; + short replication = 3; + long preferredBlockSize = 128*1024*1024; + byte storagePolicyID = HdfsConstants.EC_STORAGE_POLICY_ID; + INodeFile file = new INodeFile(id, name, permissionStatus, mtime, atime, + blks, replication, preferredBlockSize, storagePolicyID); + ByteArrayOutputStream bs = new ByteArrayOutputStream(); + file.addStripedBlocksFeature(); + + //construct StripedBlocks for the INode + BlockInfoStriped[] stripedBlks = new BlockInfoStriped[3]; + long stripedBlkId = 10000001; + long timestamp = mtime+3600; + for (int i = 0; i < stripedBlks.length; i++) { + stripedBlks[i] = new BlockInfoStriped( + new Block(stripedBlkId + i, preferredBlockSize, timestamp), + (short) 6, (short) 3); + file.getStripedBlocksFeature().addBlock(stripedBlks[i]); + } + + final String client = "testClient"; + final String clientMachine = "testClientMachine"; + final String path = "testUnderConstructionPath"; + + //save the INode to byte array + DataOutput out = new DataOutputStream(bs); + if (isUC) { + file.toUnderConstruction(client, clientMachine); + FSImageSerialization.writeINodeUnderConstruction((DataOutputStream) out, + file, path); + } else { + FSImageSerialization.writeINodeFile(file, out, false); + } + DataInput in = new DataInputStream( + new ByteArrayInputStream(bs.toByteArray())); + + // load the INode from the byte array + INodeFile fileByLoaded; + if (isUC) { + fileByLoaded = FSImageSerialization.readINodeUnderConstruction(in, + fsn, fsn.getFSImage().getLayoutVersion()); + } else { + fileByLoaded = (INodeFile) new FSImageFormat.Loader(conf, fsn) + .loadINodeWithLocalName(false, in, false); + } + + assertEquals(id, fileByLoaded.getId() ); + assertArrayEquals(isUC ? path.getBytes() : name, + fileByLoaded.getLocalName().getBytes()); + assertEquals(permissionStatus.getUserName(), + fileByLoaded.getPermissionStatus().getUserName()); + assertEquals(permissionStatus.getGroupName(), + fileByLoaded.getPermissionStatus().getGroupName()); + assertEquals(permissionStatus.getPermission(), + fileByLoaded.getPermissionStatus().getPermission()); + assertEquals(mtime, fileByLoaded.getModificationTime()); + assertEquals(isUC ? mtime : atime, fileByLoaded.getAccessTime()); + assertEquals(0, fileByLoaded.getContiguousBlocks().length); + assertEquals(0, fileByLoaded.getBlockReplication()); + assertEquals(preferredBlockSize, fileByLoaded.getPreferredBlockSize()); + + //check the BlockInfoStriped + BlockInfoStriped[] stripedBlksByLoaded = + fileByLoaded.getStripedBlocksFeature().getBlocks(); + assertEquals(3, stripedBlksByLoaded.length); + for (int i = 0; i < 3; i++) { + assertEquals(stripedBlks[i].getBlockId(), + stripedBlksByLoaded[i].getBlockId()); + assertEquals(stripedBlks[i].getNumBytes(), + stripedBlksByLoaded[i].getNumBytes()); + assertEquals(stripedBlks[i].getGenerationStamp(), + stripedBlksByLoaded[i].getGenerationStamp()); + assertEquals(stripedBlks[i].getDataBlockNum(), + stripedBlksByLoaded[i].getDataBlockNum()); + assertEquals(stripedBlks[i].getParityBlockNum(), + stripedBlksByLoaded[i].getParityBlockNum()); + } + + if (isUC) { + assertEquals(client, + fileByLoaded.getFileUnderConstructionFeature().getClientName()); + assertEquals(clientMachine, + fileByLoaded.getFileUnderConstructionFeature().getClientMachine()); + } + } + + /** + * Test if a INodeFile with BlockInfoStriped can be saved by + * FSImageSerialization and loaded by FSImageFormat#Loader. + */ + @Test + public void testSaveAndLoadInodeFile() throws IOException{ + Configuration conf = new Configuration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + testSaveAndLoadINodeFile(cluster.getNamesystem(), conf, false); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test if a INodeFileUnderConstruction with BlockInfoStriped can be + * saved and loaded by FSImageSerialization + */ + @Test + public void testSaveAndLoadInodeFileUC() throws IOException{ + // construct a INode with StripedBlock for saving and loading + Configuration conf = new Configuration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + testSaveAndLoadINodeFile(cluster.getNamesystem(), conf, true); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + /** * Ensure that the digest written by the saver equals to the digest of the * file.