HDFS-7339. Allocating and persisting block groups in NameNode. Contributed by Zhe Zhang

This commit is contained in:
Zhe Zhang 2015-01-30 16:16:26 -08:00 committed by Zhe Zhang
parent f166e67a23
commit bc2833b1c9
7 changed files with 198 additions and 6 deletions

View File

@ -159,6 +159,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT = 3;
public static final String DFS_NAMENODE_REPLICATION_MIN_KEY = "dfs.namenode.replication.min";
public static final int DFS_NAMENODE_REPLICATION_MIN_DEFAULT = 1;
public static final String DFS_NAMENODE_STRIPE_MIN_KEY = "dfs.namenode.stripe.min";
public static final int DFS_NAMENODE_STRIPE_MIN_DEFAULT = 1;
public static final String DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY = "dfs.namenode.replication.pending.timeout-sec";
public static final int DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT = -1;
public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY = "dfs.namenode.replication.max-streams";

View File

@ -54,10 +54,12 @@ public class BlockIdManager {
* The global block ID space for this file system.
*/
private final SequentialBlockIdGenerator blockIdGenerator;
private final SequentialBlockGroupIdGenerator blockGroupIdGenerator;
public BlockIdManager(BlockManager blockManager) {
this.generationStampV1Limit = HdfsConstants.GRANDFATHER_GENERATION_STAMP;
this.blockIdGenerator = new SequentialBlockIdGenerator(blockManager);
this.blockGroupIdGenerator = new SequentialBlockGroupIdGenerator(blockManager);
}
/**
@ -191,6 +193,10 @@ public long nextBlockId() {
return blockIdGenerator.nextValue();
}
public long nextBlockGroupId() {
return blockGroupIdGenerator.nextValue();
}
public boolean isGenStampInFuture(Block block) {
if (isLegacyBlock(block)) {
return block.getGenerationStamp() > getGenerationStampV1();

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.util.SequentialNumber;
/**
* Generate the next valid block group ID by incrementing the maximum block
* group ID allocated so far, with the first 2^10 block group IDs reserved.
* HDFS-EC introduces a hierarchical protocol to name blocks and groups:
* Contiguous: {reserved block IDs | flag | block ID}
* Striped: {reserved block IDs | flag | block group ID | index in group}
*
* Following n bits of reserved block IDs, The (n+1)th bit in an ID
* distinguishes contiguous (0) and striped (1) blocks. For a striped block,
* bits (n+2) to (64-m) represent the ID of its block group, while the last m
* bits represent its index of the group. The value m is determined by the
* maximum number of blocks in a group (MAX_BLOCKS_IN_GROUP).
*/
@InterfaceAudience.Private
public class SequentialBlockGroupIdGenerator extends SequentialNumber {
private final BlockManager blockManager;
SequentialBlockGroupIdGenerator(BlockManager blockManagerRef) {
super(Long.MIN_VALUE);
this.blockManager = blockManagerRef;
}
@Override // NumberGenerator
public long nextValue() {
// Skip to next legitimate block group ID based on the naming protocol
while (super.getCurrentValue() % HdfsConstants.MAX_BLOCKS_IN_GROUP > 0) {
super.nextValue();
}
// Make sure there's no conflict with existing random block IDs
while (hasValidBlockInRange(super.getCurrentValue())) {
super.skipTo(super.getCurrentValue() +
HdfsConstants.MAX_BLOCKS_IN_GROUP);
}
if (super.getCurrentValue() >= 0) {
BlockManager.LOG.warn("All negative block group IDs are used, " +
"growing into positive IDs, " +
"which might conflict with non-erasure coded blocks.");
}
return super.getCurrentValue();
}
/**
*
* @param id The starting ID of the range
* @return true if any ID in the range
* {id, id+HdfsConstants.MAX_BLOCKS_IN_GROUP} is pointed-to by a file
*/
private boolean hasValidBlockInRange(long id) {
for (int i = 0; i < HdfsConstants.MAX_BLOCKS_IN_GROUP; i++) {
Block b = new Block(id + i);
if (blockManager.getBlockCollection(b) != null) {
return true;
}
}
return false;
}
}

View File

@ -19,7 +19,6 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.util.SequentialNumber;
/**
@ -54,6 +53,11 @@ public long nextValue() {
while(isValidBlock(b)) {
b.setBlockId(super.nextValue());
}
if (b.getBlockId() < 0) {
BlockManager.LOG.warn("All positive block IDs are used, " +
"wrapping to negative IDs, " +
"which might conflict with erasure coded block groups.");
}
return b.getBlockId();
}

View File

@ -2093,7 +2093,7 @@ Block prepareFileForTruncate(INodesInPath iip,
BlockInfoContiguous oldBlock = file.getLastBlock();
boolean shouldCopyOnTruncate = shouldCopyOnTruncate(file, oldBlock);
if(newBlock == null) {
newBlock = (shouldCopyOnTruncate) ? createNewBlock() :
newBlock = (shouldCopyOnTruncate) ? createNewBlock(file.isStriped()) :
new Block(oldBlock.getBlockId(), oldBlock.getNumBytes(),
nextGenerationStamp(blockIdManager.isLegacyBlock(oldBlock)));
}
@ -3044,10 +3044,11 @@ boolean completeFile(final String src, String holder,
/**
* Create new block with a unique block id and a new generation stamp.
* @param isStriped is the file under striping or contiguous layout?
*/
Block createNewBlock() throws IOException {
assert hasWriteLock();
Block b = new Block(nextBlockId(), 0, 0);
Block b = new Block(nextBlockId(isStriped), 0, 0);
// Increment the generation stamp for every new block.
b.setGenerationStamp(nextGenerationStamp(false));
return b;
@ -5610,11 +5611,13 @@ long nextGenerationStamp(boolean legacyBlock)
/**
* Increments, logs and then returns the block ID
* @param isStriped is the file under striping or contiguous layout?
*/
private long nextBlockId() throws IOException {
private long nextBlockId(boolean isStriped) throws IOException {
assert hasWriteLock();
checkNameNodeSafeMode("Cannot get next block ID");
final long blockId = blockIdManager.nextBlockId();
final long blockId = isStriped ?
blockIdManager.nextBlockGroupId() : blockIdManager.nextBlockId();
getEditLog().logAllocateBlockId(blockId);
// NB: callers sync the log
return blockId;

View File

@ -34,12 +34,14 @@
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiff;
import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList;
@ -924,4 +926,13 @@ boolean isBlockInLatestSnapshot(BlockInfoContiguous block) {
return snapshotBlocks != null &&
Arrays.asList(snapshotBlocks).contains(block);
}
@VisibleForTesting
/**
* @return true if the file is in the striping layout.
*/
// TODO: move erasure coding policy to file XAttr (HDFS-7337)
public boolean isStriped() {
return getStoragePolicyID() == HdfsConstants.EC_STORAGE_POLICY_ID;
}
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
public class TestAddBlockgroup {
public static final Log LOG = LogFactory.getLog(TestAddBlockgroup.class);
private final short GROUP_SIZE = HdfsConstants.NUM_DATA_BLOCKS +
HdfsConstants.NUM_PARITY_BLOCKS;
private final short NUM_DATANODES = GROUP_SIZE;
private static final int BLOCKSIZE = 1024;
private static final short REPLICATION = 3;
private MiniDFSCluster cluster;
private Configuration conf;
@Before
public void setup() throws IOException {
conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES)
.build();
cluster.waitActive();
cluster.getFileSystem().setStoragePolicy(new Path("/"),
HdfsConstants.EC_STORAGE_POLICY_NAME);
}
@After
public void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testAddBlockGroup() throws Exception {
DistributedFileSystem fs = cluster.getFileSystem();
FSDirectory fsdir = cluster.getNamesystem().getFSDirectory();
final Path file1 = new Path("/file1");
DFSTestUtil.createFile(fs, file1, BLOCKSIZE * 2, REPLICATION, 0L);
INodeFile file1Node = fsdir.getINode4Write(file1.toString()).asFile();
BlockInfo[] file1Blocks = file1Node.getBlocks();
assertEquals(2, file1Blocks.length);
assertEquals(GROUP_SIZE, file1Blocks[0].numNodes());
assertEquals(HdfsConstants.MAX_BLOCKS_IN_GROUP,
file1Blocks[1].getBlockId() - file1Blocks[0].getBlockId());
}
}