HDFS-7994. Detect if resevered EC Block ID is already used during namenode startup. Contributed by Hui Zheng

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-04-16 13:16:37 -07:00 committed by Zhe Zhang
parent 5e8837dd6c
commit 4c039b0876
8 changed files with 321 additions and 15 deletions

View File

@ -82,3 +82,6 @@
separate erasurecoding proto file (Rakesh R via vinayakumarb)
HDFS-7349. Support DFS command for the EC encoding (vinayakumarb)
HDFS-7994. Detect if resevered EC Block ID is already used during namenode
startup. (Hui Zheng via szetszwo)

View File

@ -274,6 +274,9 @@ public int getPendingDataNodeMessageCount() {
/** Check whether name system is running before terminating */
private boolean checkNSRunning = true;
/** Check whether there are any non-EC blocks using StripedID */
private boolean hasNonEcBlockUsingStripedID = false;
public BlockManager(final Namesystem namesystem, final Configuration conf)
throws IOException {
this.namesystem = namesystem;
@ -2934,6 +2937,24 @@ public double getReplicationQueuesInitProgress() {
return replicationQueuesInitProgress;
}
/**
* Get the value of whether there are any non-EC blocks using StripedID.
*
* @return Returns the value of whether there are any non-EC blocks using StripedID.
*/
public boolean hasNonEcBlockUsingStripedID(){
return hasNonEcBlockUsingStripedID;
}
/**
* Set the value of whether there are any non-EC blocks using StripedID.
*
* @param has - the value of whether there are any non-EC blocks using StripedID.
*/
public void hasNonEcBlockUsingStripedID(boolean has){
hasNonEcBlockUsingStripedID = has;
}
/**
* Process a single possibly misreplicated block. This adds it to the
* appropriate queues if necessary, and returns a result code indicating
@ -3541,8 +3562,10 @@ public BlockInfo getStoredBlock(Block block) {
if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
info = blocksMap.getStoredBlock(
new Block(BlockIdManager.convertToStripedID(block.getBlockId())));
if ((info == null) && hasNonEcBlockUsingStripedID()){
info = blocksMap.getStoredBlock(block);
}
if (info == null) {
} else {
info = blocksMap.getStoredBlock(block);
}
return info;
@ -3716,6 +3739,21 @@ public BlockInfo addBlockCollection(BlockInfo block,
return blocksMap.addBlockCollection(block, bc);
}
/**
* Do some check when adding a block to blocksmap.
* For HDFS-7994 to check whether then block is a NonEcBlockUsingStripedID.
*
*/
public BlockInfo addBlockCollectionWithCheck(
BlockInfo block, BlockCollection bc) {
if (!hasNonEcBlockUsingStripedID()){
if (BlockIdManager.isStripedBlockID(block.getBlockId())) {
hasNonEcBlockUsingStripedID(true);
}
}
return addBlockCollection(block, bc);
}
public BlockCollection getBlockCollection(Block b) {
return blocksMap.getBlockCollection(b);
}

View File

@ -990,7 +990,7 @@ private void addNewBlock(AddBlockOp op, INodeFile file, boolean isStriped)
newBlockInfo = new BlockInfoContiguousUnderConstruction(newBlock,
file.getPreferredBlockReplication());
}
fsNamesys.getBlockManager().addBlockCollection(newBlockInfo, file);
fsNamesys.getBlockManager().addBlockCollectionWithCheck(newBlockInfo, file);
file.addBlock(newBlockInfo);
fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
}
@ -1082,7 +1082,7 @@ private void updateBlocks(FSDirectory fsDir, BlockListUpdatingOp op,
newBI = new BlockInfoContiguous(newBlock,
file.getPreferredBlockReplication());
}
fsNamesys.getBlockManager().addBlockCollection(newBI, file);
fsNamesys.getBlockManager().addBlockCollectionWithCheck(newBI, file);
file.addBlock(newBI);
fsNamesys.getBlockManager().processQueuedMessagesForBlock(newBlock);
}

View File

@ -701,7 +701,7 @@ public void updateBlocksMap(INodeFile file) {
if (blocks != null) {
final BlockManager bm = namesystem.getBlockManager();
for (int i = 0; i < blocks.length; i++) {
file.setBlock(i, bm.addBlockCollection(blocks[i], file));
file.setBlock(i, bm.addBlockCollectionWithCheck(blocks[i], file));
}
}
}
@ -1006,8 +1006,8 @@ LayoutVersion.Feature.ADD_INODE_ID, getLayoutVersion())) {
if (oldnode.numBlocks() > 0) {
BlockInfo ucBlock = cons.getLastBlock();
// we do not replace the inode, just replace the last block of oldnode
BlockInfo info = namesystem.getBlockManager().addBlockCollection(
ucBlock, oldnode);
BlockInfo info = namesystem.getBlockManager()
.addBlockCollectionWithCheck(ucBlock, oldnode);
oldnode.setBlock(oldnode.numBlocks() - 1, info);
}

View File

@ -219,7 +219,7 @@ public static void updateBlocksMap(INodeFile file, BlockManager bm) {
final BlockInfo[] blocks = file.getBlocks();
if (blocks != null) {
for (int i = 0; i < blocks.length; i++) {
file.setBlock(i, bm.addBlockCollection(blocks[i], file));
file.setBlock(i, bm.addBlockCollectionWithCheck(blocks[i], file));
}
}
}

View File

@ -247,7 +247,7 @@ private void loadFileDiffList(InputStream in, INodeFile file, int size)
(BlockInfoContiguous) fsn.getBlockManager().getStoredBlock(blk);
if(storedBlock == null) {
storedBlock = (BlockInfoContiguous) fsn.getBlockManager()
.addBlockCollection(new BlockInfoContiguous(blk,
.addBlockCollectionWithCheck(new BlockInfoContiguous(blk,
copy.getFileReplication()), file);
}
blocks[j] = storedBlock;

View File

@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
@ -47,6 +48,7 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
@ -570,4 +572,108 @@ public void testUpdateStripedBlocks() throws IOException{
}
}
}
@Test
public void testHasNonEcBlockUsingStripedIDForAddBlock() throws IOException{
// start a cluster
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9)
.build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
FSNamesystem fns = cluster.getNamesystem();
String testDir = "/test_block_manager";
String testFile = "testfile_addblock";
String testFilePath = testDir + "/" + testFile;
String clientName = "testUser_addblock";
String clientMachine = "testMachine_addblock";
long blkId = -1;
long blkNumBytes = 1024;
long timestamp = 1426222918;
fs.mkdir(new Path(testDir), new FsPermission("755"));
Path p = new Path(testFilePath);
//check whether the hasNonEcBlockUsingStripedID is set
//after loading a addblock-editlog
DFSTestUtil.createFile(fs, p, 0, (short) 1, 1);
BlockInfoContiguous cBlk = new BlockInfoContiguous(
new Block(blkId, blkNumBytes, timestamp), (short)3);
INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath);
file.toUnderConstruction(clientName, clientMachine);
file.addBlock(cBlk);
fns.getEditLog().logAddBlock(testFilePath, file);
file.toCompleteFile(System.currentTimeMillis());
cluster.restartNameNodes();
cluster.waitActive();
fns = cluster.getNamesystem();
assertTrue(fns.getBlockManager().hasNonEcBlockUsingStripedID());
cluster.shutdown();
cluster = null;
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testHasNonEcBlockUsingStripedIDForUpdateBlocks()
throws IOException{
// start a cluster
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9)
.build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
FSNamesystem fns = cluster.getNamesystem();
String testDir = "/test_block_manager";
String testFile = "testfile_002";
String testFilePath = testDir + "/" + testFile;
String clientName = "testUser2";
String clientMachine = "testMachine1";
long blkId = 100;
long blkNumBytes = 1024;
long timestamp = 1426222918;
fs.mkdir(new Path(testDir), new FsPermission("755"));
Path p = new Path(testFilePath);
DFSTestUtil.createFile(fs, p, 0, (short) 1, 1);
BlockInfoContiguous cBlk = new BlockInfoContiguous(
new Block(blkId, blkNumBytes, timestamp), (short)3);
INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath);
file.toUnderConstruction(clientName, clientMachine);
file.addBlock(cBlk);
file.toCompleteFile(System.currentTimeMillis());
long newBlkNumBytes = 1024*8;
long newTimestamp = 1426222918+3600;
file.toUnderConstruction(clientName, clientMachine);
file.getLastBlock().setBlockId(-100);
file.getLastBlock().setNumBytes(newBlkNumBytes);
file.getLastBlock().setGenerationStamp(newTimestamp);
fns.getEditLog().logUpdateBlocks(testFilePath, file, true);
file.toCompleteFile(System.currentTimeMillis());
cluster.restartNameNodes();
cluster.waitActive();
fns = cluster.getNamesystem();
assertTrue(fns.getBlockManager().hasNonEcBlockUsingStripedID());
cluster.shutdown();
cluster = null;
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -17,11 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.DataOutput;
import java.io.DataOutputStream;
@ -62,6 +57,12 @@
import org.apache.hadoop.test.PathUtils;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
public class TestFSImage {
private static final String HADOOP_2_7_ZER0_BLOCK_SIZE_TGZ =
@ -426,4 +427,162 @@ public void testSupportBlockGroup() throws IOException {
cluster.shutdown();
}
}
@Test
public void testHasNonEcBlockUsingStripedIDForLoadFile() throws IOException{
// start a cluster
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9)
.build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
FSNamesystem fns = cluster.getNamesystem();
String testDir = "/test_block_manager";
String testFile = "testfile_loadfile";
String testFilePath = testDir + "/" + testFile;
String clientName = "testUser_loadfile";
String clientMachine = "testMachine_loadfile";
long blkId = -1;
long blkNumBytes = 1024;
long timestamp = 1426222918;
fs.mkdir(new Path(testDir), new FsPermission("755"));
Path p = new Path(testFilePath);
DFSTestUtil.createFile(fs, p, 0, (short) 1, 1);
BlockInfoContiguous cBlk = new BlockInfoContiguous(
new Block(blkId, blkNumBytes, timestamp), (short)3);
INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath);
file.toUnderConstruction(clientName, clientMachine);
file.addBlock(cBlk);
file.toCompleteFile(System.currentTimeMillis());
fns.enterSafeMode(false);
fns.saveNamespace(0, 0);
cluster.restartNameNodes();
cluster.waitActive();
fns = cluster.getNamesystem();
assertTrue(fns.getBlockManager().hasNonEcBlockUsingStripedID());
//after nonEcBlockUsingStripedID is deleted
//the hasNonEcBlockUsingStripedID is set to false
fs = cluster.getFileSystem();
fs.delete(p,false);
fns.enterSafeMode(false);
fns.saveNamespace(0, 0);
cluster.restartNameNodes();
cluster.waitActive();
fns = cluster.getNamesystem();
assertFalse(fns.getBlockManager().hasNonEcBlockUsingStripedID());
cluster.shutdown();
cluster = null;
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testHasNonEcBlockUsingStripedIDForLoadUCFile()
throws IOException{
// start a cluster
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9)
.build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
FSNamesystem fns = cluster.getNamesystem();
String testDir = "/test_block_manager";
String testFile = "testfile_loaducfile";
String testFilePath = testDir + "/" + testFile;
String clientName = "testUser_loaducfile";
String clientMachine = "testMachine_loaducfile";
long blkId = -1;
long blkNumBytes = 1024;
long timestamp = 1426222918;
fs.mkdir(new Path(testDir), new FsPermission("755"));
Path p = new Path(testFilePath);
DFSTestUtil.createFile(fs, p, 0, (short) 1, 1);
BlockInfoContiguous cBlk = new BlockInfoContiguous(
new Block(blkId, blkNumBytes, timestamp), (short)3);
INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath);
file.toUnderConstruction(clientName, clientMachine);
file.addBlock(cBlk);
fns.enterSafeMode(false);
fns.saveNamespace(0, 0);
cluster.restartNameNodes();
cluster.waitActive();
fns = cluster.getNamesystem();
assertTrue(fns.getBlockManager().hasNonEcBlockUsingStripedID());
cluster.shutdown();
cluster = null;
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testHasNonEcBlockUsingStripedIDForLoadSnapshot()
throws IOException{
// start a cluster
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(9)
.build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
FSNamesystem fns = cluster.getNamesystem();
String testDir = "/test_block_manager";
String testFile = "testfile_loadSnapshot";
String testFilePath = testDir + "/" + testFile;
String clientName = "testUser_loadSnapshot";
String clientMachine = "testMachine_loadSnapshot";
long blkId = -1;
long blkNumBytes = 1024;
long timestamp = 1426222918;
Path d = new Path(testDir);
fs.mkdir(d, new FsPermission("755"));
fs.allowSnapshot(d);
Path p = new Path(testFilePath);
DFSTestUtil.createFile(fs, p, 0, (short) 1, 1);
BlockInfoContiguous cBlk = new BlockInfoContiguous(
new Block(blkId, blkNumBytes, timestamp), (short)3);
INodeFile file = (INodeFile)fns.getFSDirectory().getINode(testFilePath);
file.toUnderConstruction(clientName, clientMachine);
file.addBlock(cBlk);
file.toCompleteFile(System.currentTimeMillis());
fs.createSnapshot(d,"testHasNonEcBlockUsingStripeID");
fs.truncate(p,0);
fns.enterSafeMode(false);
fns.saveNamespace(0, 0);
cluster.restartNameNodes();
cluster.waitActive();
fns = cluster.getNamesystem();
assertTrue(fns.getBlockManager().hasNonEcBlockUsingStripedID());
cluster.shutdown();
cluster = null;
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}