HDFS-9542. Move BlockIdManager from FSNamesystem to BlockManager. Contributed by Jing Zhao.
This commit is contained in:
parent
468a53b22f
commit
c304890c8c
@ -939,6 +939,8 @@ Release 2.9.0 - UNRELEASED
|
||||
HDFS-9576: HTrace: collect position/length information on read operations
|
||||
(zhz via cmccabe)
|
||||
|
||||
HDFS-9542. Move BlockIdManager from FSNamesystem to BlockManager. (jing9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -163,7 +163,7 @@ public long getGenerationStampV2() {
|
||||
/**
|
||||
* Increments, logs and then returns the stamp
|
||||
*/
|
||||
public long nextGenerationStamp(boolean legacyBlock) throws IOException {
|
||||
long nextGenerationStamp(boolean legacyBlock) throws IOException {
|
||||
return legacyBlock ? getNextGenerationStampV1() :
|
||||
getNextGenerationStampV2();
|
||||
}
|
||||
@ -199,22 +199,19 @@ public long getGenerationStampV1Limit() {
|
||||
*
|
||||
* @return true if the block ID was randomly generated, false otherwise.
|
||||
*/
|
||||
public boolean isLegacyBlock(Block block) {
|
||||
boolean isLegacyBlock(Block block) {
|
||||
return block.getGenerationStamp() < getGenerationStampV1Limit();
|
||||
}
|
||||
|
||||
/**
|
||||
* Increments, logs and then returns the block ID
|
||||
*/
|
||||
public long nextContiguousBlockId() {
|
||||
return blockIdGenerator.nextValue();
|
||||
long nextBlockId(boolean isStriped) {
|
||||
return isStriped ? blockGroupIdGenerator.nextValue() :
|
||||
blockIdGenerator.nextValue();
|
||||
}
|
||||
|
||||
public long nextStripedBlockId() {
|
||||
return blockGroupIdGenerator.nextValue();
|
||||
}
|
||||
|
||||
public boolean isGenStampInFuture(Block block) {
|
||||
boolean isGenStampInFuture(Block block) {
|
||||
if (isLegacyBlock(block)) {
|
||||
return block.getGenerationStamp() > getGenerationStampV1();
|
||||
} else {
|
||||
@ -222,7 +219,7 @@ public boolean isGenStampInFuture(Block block) {
|
||||
}
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
void clear() {
|
||||
generationStampV1.setCurrentValue(GenerationStamp.LAST_RESERVED_STAMP);
|
||||
generationStampV2.setCurrentValue(GenerationStamp.LAST_RESERVED_STAMP);
|
||||
getBlockIdGenerator().setCurrentValue(SequentialBlockIdGenerator
|
||||
@ -240,7 +237,7 @@ public static boolean isStripedBlockID(long id) {
|
||||
* and the other 60 bits are 1. Group ID is the first 60 bits of any
|
||||
* data/parity block id in the same striped block group.
|
||||
*/
|
||||
public static long convertToStripedID(long id) {
|
||||
static long convertToStripedID(long id) {
|
||||
return id & (~HdfsServerConstants.BLOCK_GROUP_INDEX_MASK);
|
||||
}
|
||||
|
||||
|
@ -78,7 +78,6 @@
|
||||
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
||||
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
||||
@ -308,11 +307,14 @@ public int getPendingDataNodeMessageCount() {
|
||||
/** Check whether there are any non-EC blocks using StripedID */
|
||||
private boolean hasNonEcBlockUsingStripedID = false;
|
||||
|
||||
public BlockManager(final Namesystem namesystem, final Configuration conf)
|
||||
throws IOException {
|
||||
private final BlockIdManager blockIdManager;
|
||||
|
||||
public BlockManager(final Namesystem namesystem, boolean haEnabled,
|
||||
final Configuration conf) throws IOException {
|
||||
this.namesystem = namesystem;
|
||||
datanodeManager = new DatanodeManager(this, namesystem, conf);
|
||||
heartbeatManager = datanodeManager.getHeartbeatManager();
|
||||
this.blockIdManager = new BlockIdManager(this);
|
||||
|
||||
startupDelayBlockDeletionInMs = conf.getLong(
|
||||
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
|
||||
@ -387,7 +389,7 @@ public BlockManager(final Namesystem namesystem, final Configuration conf)
|
||||
DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT);
|
||||
this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
|
||||
|
||||
bmSafeMode = new BlockManagerSafeMode(this, namesystem, conf);
|
||||
bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
|
||||
|
||||
LOG.info("defaultReplication = " + defaultReplication);
|
||||
LOG.info("maxReplication = " + maxReplication);
|
||||
@ -498,8 +500,7 @@ private boolean isBlockTokenEnabled() {
|
||||
|
||||
/** Should the access keys be updated? */
|
||||
boolean shouldUpdateBlockKey(final long updateTime) throws IOException {
|
||||
return isBlockTokenEnabled()? blockTokenSecretManager.updateKeys(updateTime)
|
||||
: false;
|
||||
return isBlockTokenEnabled() && blockTokenSecretManager.updateKeys(updateTime);
|
||||
}
|
||||
|
||||
public void activate(Configuration conf, long blockTotal) {
|
||||
@ -538,7 +539,7 @@ public BlockPlacementPolicy getBlockPlacementPolicy() {
|
||||
|
||||
/** Dump meta data to out. */
|
||||
public void metaSave(PrintWriter out) {
|
||||
assert namesystem.hasWriteLock();
|
||||
assert namesystem.hasWriteLock(); // TODO: block manager read lock and NS write lock
|
||||
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
|
||||
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
|
||||
datanodeManager.fetchDatanodes(live, dead, false);
|
||||
@ -1108,27 +1109,8 @@ public boolean isSufficientlyReplicated(BlockInfo b) {
|
||||
return countNodes(b).liveReplicas() >= replication;
|
||||
}
|
||||
|
||||
/**
|
||||
* return a list of blocks & their locations on <code>datanode</code> whose
|
||||
* total size is <code>size</code>
|
||||
*
|
||||
* @param datanode on which blocks are located
|
||||
* @param size total size of blocks
|
||||
*/
|
||||
public BlocksWithLocations getBlocks(DatanodeID datanode, long size
|
||||
) throws IOException {
|
||||
namesystem.checkOperation(OperationCategory.READ);
|
||||
namesystem.readLock();
|
||||
try {
|
||||
namesystem.checkOperation(OperationCategory.READ);
|
||||
return getBlocksWithLocations(datanode, size);
|
||||
} finally {
|
||||
namesystem.readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/** Get all blocks with location information from a datanode. */
|
||||
private BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
|
||||
public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
|
||||
final long size) throws UnregisteredNodeException {
|
||||
final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
|
||||
if (node == null) {
|
||||
@ -2353,8 +2335,7 @@ private void processFirstBlockReport(
|
||||
+ " on " + storageInfo.getDatanodeDescriptor() + " size " +
|
||||
iblk.getNumBytes() + " replicaState = " + reportedState);
|
||||
}
|
||||
if (shouldPostponeBlocksFromFuture &&
|
||||
namesystem.isGenStampInFuture(iblk)) {
|
||||
if (shouldPostponeBlocksFromFuture && isGenStampInFuture(iblk)) {
|
||||
queueReportedBlock(storageInfo, iblk, reportedState,
|
||||
QUEUE_REASON_FUTURE_GENSTAMP);
|
||||
continue;
|
||||
@ -2499,8 +2480,7 @@ private BlockInfo processReportedBlock(
|
||||
+ " replicaState = " + reportedState);
|
||||
}
|
||||
|
||||
if (shouldPostponeBlocksFromFuture &&
|
||||
namesystem.isGenStampInFuture(block)) {
|
||||
if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) {
|
||||
queueReportedBlock(storageInfo, block, reportedState,
|
||||
QUEUE_REASON_FUTURE_GENSTAMP);
|
||||
return null;
|
||||
@ -3360,8 +3340,7 @@ private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) {
|
||||
|
||||
private void removeStoredBlock(DatanodeStorageInfo storageInfo, Block block,
|
||||
DatanodeDescriptor node) {
|
||||
if (shouldPostponeBlocksFromFuture &&
|
||||
namesystem.isGenStampInFuture(block)) {
|
||||
if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) {
|
||||
queueReportedBlock(storageInfo, block, null,
|
||||
QUEUE_REASON_FUTURE_GENSTAMP);
|
||||
return;
|
||||
@ -4201,6 +4180,7 @@ public void shutdown() {
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
blockIdManager.clear();
|
||||
clearQueues();
|
||||
blocksMap.clear();
|
||||
}
|
||||
@ -4364,4 +4344,24 @@ void enqueue(Runnable action) throws InterruptedException {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public BlockIdManager getBlockIdManager() {
|
||||
return blockIdManager;
|
||||
}
|
||||
|
||||
public long nextGenerationStamp(boolean legacyBlock) throws IOException {
|
||||
return blockIdManager.nextGenerationStamp(legacyBlock);
|
||||
}
|
||||
|
||||
public boolean isLegacyBlock(Block block) {
|
||||
return blockIdManager.isLegacyBlock(block);
|
||||
}
|
||||
|
||||
public long nextBlockId(boolean isStriped) {
|
||||
return blockIdManager.nextBlockId(isStriped);
|
||||
}
|
||||
|
||||
boolean isGenStampInFuture(Block block) {
|
||||
return blockIdManager.isGenStampInFuture(block);
|
||||
}
|
||||
}
|
||||
|
@ -115,10 +115,10 @@ enum BMSafeModeStatus {
|
||||
private final boolean inRollBack;
|
||||
|
||||
BlockManagerSafeMode(BlockManager blockManager, Namesystem namesystem,
|
||||
Configuration conf) {
|
||||
boolean haEnabled, Configuration conf) {
|
||||
this.blockManager = blockManager;
|
||||
this.namesystem = namesystem;
|
||||
this.haEnabled = namesystem.isHaEnabled();
|
||||
this.haEnabled = haEnabled;
|
||||
this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
|
||||
DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
|
||||
if (this.threshold > 1.0) {
|
||||
@ -473,7 +473,7 @@ void checkBlocksWithFutureGS(BlockReportReplica brr) {
|
||||
|
||||
if (!blockManager.getShouldPostponeBlocksFromFuture() &&
|
||||
!inRollBack &&
|
||||
namesystem.isGenStampInFuture(brr)) {
|
||||
blockManager.isGenStampInFuture(brr)) {
|
||||
numberOfBytesInFutureBlocks.addAndGet(brr.getBytesOnDisk());
|
||||
}
|
||||
}
|
||||
|
@ -228,7 +228,7 @@ static Block prepareFileForTruncate(FSNamesystem fsn, INodesInPath iip,
|
||||
if (newBlock == null) {
|
||||
newBlock = (shouldCopyOnTruncate) ? fsn.createNewBlock(false)
|
||||
: new Block(oldBlock.getBlockId(), oldBlock.getNumBytes(),
|
||||
fsn.nextGenerationStamp(fsn.getBlockIdManager().isLegacyBlock(
|
||||
fsn.nextGenerationStamp(fsn.getBlockManager().isLegacyBlock(
|
||||
oldBlock)));
|
||||
}
|
||||
|
||||
|
@ -48,6 +48,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
|
||||
@ -116,12 +117,14 @@ public class FSEditLogLoader {
|
||||
static final long REPLAY_TRANSACTION_LOG_INTERVAL = 1000; // 1sec
|
||||
|
||||
private final FSNamesystem fsNamesys;
|
||||
private final BlockManager blockManager;
|
||||
private long lastAppliedTxId;
|
||||
/** Total number of end transactions loaded. */
|
||||
private int totalEdits = 0;
|
||||
|
||||
public FSEditLogLoader(FSNamesystem fsNamesys, long lastAppliedTxId) {
|
||||
this.fsNamesys = fsNamesys;
|
||||
this.blockManager = fsNamesys.getBlockManager();
|
||||
this.lastAppliedTxId = lastAppliedTxId;
|
||||
}
|
||||
|
||||
@ -586,7 +589,7 @@ fsDir, renameReservedPathsOnUpgrade(deleteOp.path, logVersion),
|
||||
}
|
||||
case OP_SET_GENSTAMP_V1: {
|
||||
SetGenstampV1Op setGenstampV1Op = (SetGenstampV1Op)op;
|
||||
fsNamesys.getBlockIdManager().setGenerationStampV1(
|
||||
blockManager.getBlockIdManager().setGenerationStampV1(
|
||||
setGenstampV1Op.genStampV1);
|
||||
break;
|
||||
}
|
||||
@ -794,7 +797,7 @@ fsDir, renameReservedPathsOnUpgrade(timesOp.path, logVersion),
|
||||
}
|
||||
case OP_SET_GENSTAMP_V2: {
|
||||
SetGenstampV2Op setGenstampV2Op = (SetGenstampV2Op) op;
|
||||
fsNamesys.getBlockIdManager().setGenerationStampV2(
|
||||
blockManager.getBlockIdManager().setGenerationStampV2(
|
||||
setGenstampV2Op.genStampV2);
|
||||
break;
|
||||
}
|
||||
@ -803,10 +806,10 @@ fsDir, renameReservedPathsOnUpgrade(timesOp.path, logVersion),
|
||||
if (BlockIdManager.isStripedBlockID(allocateBlockIdOp.blockId)) {
|
||||
// ALLOCATE_BLOCK_ID is added for sequential block id, thus if the id
|
||||
// is negative, it must belong to striped blocks
|
||||
fsNamesys.getBlockIdManager().setLastAllocatedStripedBlockId(
|
||||
blockManager.getBlockIdManager().setLastAllocatedStripedBlockId(
|
||||
allocateBlockIdOp.blockId);
|
||||
} else {
|
||||
fsNamesys.getBlockIdManager().setLastAllocatedContiguousBlockId(
|
||||
blockManager.getBlockIdManager().setLastAllocatedContiguousBlockId(
|
||||
allocateBlockIdOp.blockId);
|
||||
}
|
||||
break;
|
||||
|
@ -48,11 +48,11 @@
|
||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.LayoutFlags;
|
||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
@ -344,27 +344,26 @@ public void load(File curFile) throws IOException {
|
||||
|
||||
// read in the last generation stamp for legacy blocks.
|
||||
long genstamp = in.readLong();
|
||||
namesystem.getBlockIdManager().setGenerationStampV1(genstamp);
|
||||
final BlockIdManager blockIdManager = namesystem.getBlockManager()
|
||||
.getBlockIdManager();
|
||||
blockIdManager.setGenerationStampV1(genstamp);
|
||||
|
||||
if (NameNodeLayoutVersion.supports(
|
||||
LayoutVersion.Feature.SEQUENTIAL_BLOCK_ID, imgVersion)) {
|
||||
// read the starting generation stamp for sequential block IDs
|
||||
genstamp = in.readLong();
|
||||
namesystem.getBlockIdManager().setGenerationStampV2(genstamp);
|
||||
blockIdManager.setGenerationStampV2(genstamp);
|
||||
|
||||
// read the last generation stamp for blocks created after
|
||||
// the switch to sequential block IDs.
|
||||
long stampAtIdSwitch = in.readLong();
|
||||
namesystem.getBlockIdManager().setGenerationStampV1Limit(stampAtIdSwitch);
|
||||
blockIdManager.setGenerationStampV1Limit(stampAtIdSwitch);
|
||||
|
||||
// read the max sequential block ID.
|
||||
long maxSequentialBlockId = in.readLong();
|
||||
namesystem.getBlockIdManager().setLastAllocatedContiguousBlockId(
|
||||
maxSequentialBlockId);
|
||||
blockIdManager.setLastAllocatedContiguousBlockId(maxSequentialBlockId);
|
||||
} else {
|
||||
|
||||
long startingGenStamp = namesystem.getBlockIdManager()
|
||||
.upgradeGenerationStampToV2();
|
||||
long startingGenStamp = blockIdManager.upgradeGenerationStampToV2();
|
||||
// This is an upgrade.
|
||||
LOG.info("Upgrading to sequential block IDs. Generation stamp " +
|
||||
"for new blocks set to " + startingGenStamp);
|
||||
@ -1269,10 +1268,12 @@ void save(File newFile, FSImageCompression compression) throws IOException {
|
||||
out.writeInt(sourceNamesystem.unprotectedGetNamespaceInfo()
|
||||
.getNamespaceID());
|
||||
out.writeLong(numINodes);
|
||||
out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampV1());
|
||||
out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampV2());
|
||||
out.writeLong(sourceNamesystem.getBlockIdManager().getGenerationStampAtblockIdSwitch());
|
||||
out.writeLong(sourceNamesystem.getBlockIdManager().getLastAllocatedContiguousBlockId());
|
||||
final BlockIdManager blockIdManager = sourceNamesystem.getBlockManager()
|
||||
.getBlockIdManager();
|
||||
out.writeLong(blockIdManager.getGenerationStampV1());
|
||||
out.writeLong(blockIdManager.getGenerationStampV2());
|
||||
out.writeLong(blockIdManager.getGenerationStampAtblockIdSwitch());
|
||||
out.writeLong(blockIdManager.getLastAllocatedContiguousBlockId());
|
||||
out.writeLong(context.getTxId());
|
||||
out.writeLong(sourceNamesystem.dir.getLastInodeId());
|
||||
|
||||
|
@ -293,7 +293,7 @@ public int compare(FileSummary.Section s1, FileSummary.Section s2) {
|
||||
|
||||
private void loadNameSystemSection(InputStream in) throws IOException {
|
||||
NameSystemSection s = NameSystemSection.parseDelimitedFrom(in);
|
||||
BlockIdManager blockIdManager = fsn.getBlockIdManager();
|
||||
BlockIdManager blockIdManager = fsn.getBlockManager().getBlockIdManager();
|
||||
blockIdManager.setGenerationStampV1(s.getGenstampV1());
|
||||
blockIdManager.setGenerationStampV2(s.getGenstampV2());
|
||||
blockIdManager.setGenerationStampV1Limit(s.getGenstampV1Limit());
|
||||
@ -548,7 +548,7 @@ private void saveNameSystemSection(FileSummary.Builder summary)
|
||||
throws IOException {
|
||||
final FSNamesystem fsn = context.getSourceNamesystem();
|
||||
OutputStream out = sectionOutputStream;
|
||||
BlockIdManager blockIdManager = fsn.getBlockIdManager();
|
||||
BlockIdManager blockIdManager = fsn.getBlockManager().getBlockIdManager();
|
||||
NameSystemSection.Builder b = NameSystemSection.newBuilder()
|
||||
.setGenstampV1(blockIdManager.getGenerationStampV1())
|
||||
.setGenstampV1Limit(blockIdManager.getGenerationStampV1Limit())
|
||||
|
@ -195,7 +195,6 @@
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager.SecretManagerState;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockIdManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
@ -236,6 +235,7 @@
|
||||
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
|
||||
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||
@ -316,8 +316,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||
NameNodeMXBean {
|
||||
public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
|
||||
|
||||
private final BlockIdManager blockIdManager;
|
||||
|
||||
boolean isAuditEnabled() {
|
||||
return (!isDefaultAuditLogger || auditLog.isInfoEnabled())
|
||||
&& !auditLoggers.isEmpty();
|
||||
@ -557,7 +555,6 @@ protected void setImageLoaded(boolean flag) {
|
||||
void clear() {
|
||||
dir.reset();
|
||||
dtSecretManager.reset();
|
||||
blockIdManager.clear();
|
||||
leaseManager.removeAllLeases();
|
||||
snapshotManager.clearSnapshottableDirs();
|
||||
cacheManager.clear();
|
||||
@ -570,8 +567,7 @@ void clear() {
|
||||
LeaseManager getLeaseManager() {
|
||||
return leaseManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
public boolean isHaEnabled() {
|
||||
return haEnabled;
|
||||
}
|
||||
@ -728,9 +724,8 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
|
||||
}
|
||||
|
||||
// block manager needs the haEnabled initialized
|
||||
this.blockManager = new BlockManager(this, conf);
|
||||
this.blockManager = new BlockManager(this, haEnabled, conf);
|
||||
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
|
||||
this.blockIdManager = new BlockIdManager(blockManager);
|
||||
|
||||
// Get the checksum type from config
|
||||
String checksumTypeStr = conf.get(DFS_CHECKSUM_TYPE_KEY,
|
||||
@ -1253,8 +1248,7 @@ void stopStandbyServices() throws IOException {
|
||||
getFSImage().editLog.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
public void checkOperation(OperationCategory op) throws StandbyException {
|
||||
if (haContext != null) {
|
||||
// null in some unit tests
|
||||
@ -1542,8 +1536,7 @@ void close() {
|
||||
public boolean isRunning() {
|
||||
return fsRunning;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
public boolean isInStandbyState() {
|
||||
if (haContext == null || haContext.getState() == null) {
|
||||
// We're still starting up. In this case, if HA is
|
||||
@ -1555,6 +1548,25 @@ public boolean isInStandbyState() {
|
||||
return HAServiceState.STANDBY == haContext.getState().getServiceState();
|
||||
}
|
||||
|
||||
/**
|
||||
* return a list of blocks & their locations on <code>datanode</code> whose
|
||||
* total size is <code>size</code>
|
||||
*
|
||||
* @param datanode on which blocks are located
|
||||
* @param size total size of blocks
|
||||
*/
|
||||
public BlocksWithLocations getBlocks(DatanodeID datanode, long size)
|
||||
throws IOException {
|
||||
checkOperation(OperationCategory.READ);
|
||||
readLock();
|
||||
try {
|
||||
checkOperation(OperationCategory.READ);
|
||||
return getBlockManager().getBlocksWithLocations(datanode, size);
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Dump all metadata into specified file
|
||||
*/
|
||||
@ -3041,7 +3053,7 @@ boolean internalReleaseLease(Lease lease, String src, INodesInPath iip,
|
||||
}
|
||||
// start recovery of the last block for this file
|
||||
long blockRecoveryId = nextGenerationStamp(
|
||||
blockIdManager.isLegacyBlock(lastBlock));
|
||||
blockManager.isLegacyBlock(lastBlock));
|
||||
lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile);
|
||||
if(copyOnTruncate) {
|
||||
lastBlock.setGenerationStamp(blockRecoveryId);
|
||||
@ -4482,11 +4494,11 @@ public String getTopUserOpCounts() {
|
||||
* Increments, logs and then returns the stamp
|
||||
*/
|
||||
long nextGenerationStamp(boolean legacyBlock)
|
||||
throws IOException, SafeModeException {
|
||||
throws IOException {
|
||||
assert hasWriteLock();
|
||||
checkNameNodeSafeMode("Cannot get next generation stamp");
|
||||
|
||||
long gs = blockIdManager.nextGenerationStamp(legacyBlock);
|
||||
long gs = blockManager.nextGenerationStamp(legacyBlock);
|
||||
if (legacyBlock) {
|
||||
getEditLog().logGenerationStampV1(gs);
|
||||
} else {
|
||||
@ -4504,8 +4516,7 @@ long nextGenerationStamp(boolean legacyBlock)
|
||||
private long nextBlockId(boolean isStriped) throws IOException {
|
||||
assert hasWriteLock();
|
||||
checkNameNodeSafeMode("Cannot get next block ID");
|
||||
final long blockId = isStriped ?
|
||||
blockIdManager.nextStripedBlockId() : blockIdManager.nextContiguousBlockId();
|
||||
final long blockId = blockManager.nextBlockId(isStriped);
|
||||
getEditLog().logAllocateBlockId(blockId);
|
||||
// NB: callers sync the log
|
||||
return blockId;
|
||||
@ -4632,7 +4643,8 @@ LocatedBlock bumpBlockGenerationStamp(ExtendedBlock block,
|
||||
final INodeFile file = checkUCBlock(block, clientName);
|
||||
|
||||
// get a new generation stamp and an access token
|
||||
block.setGenerationStamp(nextGenerationStamp(blockIdManager.isLegacyBlock(block.getLocalBlock())));
|
||||
block.setGenerationStamp(nextGenerationStamp(
|
||||
blockManager.isLegacyBlock(block.getLocalBlock())));
|
||||
|
||||
locatedBlock = BlockManager.newLocatedBlock(
|
||||
block, file.getLastBlock(), null, -1);
|
||||
@ -5472,10 +5484,6 @@ public BlockManager getBlockManager() {
|
||||
return blockManager;
|
||||
}
|
||||
|
||||
public BlockIdManager getBlockIdManager() {
|
||||
return blockIdManager;
|
||||
}
|
||||
|
||||
/** @return the FSDirectory. */
|
||||
public FSDirectory getFSDirectory() {
|
||||
return dir;
|
||||
@ -5611,11 +5619,6 @@ public synchronized void verifyToken(DelegationTokenIdentifier identifier,
|
||||
throw it;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isGenStampInFuture(Block block) {
|
||||
return blockIdManager.isGenStampInFuture(block);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public EditLogTailer getEditLogTailer() {
|
||||
|
@ -560,7 +560,7 @@ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
|
||||
}
|
||||
checkNNStartup();
|
||||
namesystem.checkSuperuserPrivilege();
|
||||
return namesystem.getBlockManager().getBlocks(datanode, size);
|
||||
return namesystem.getBlocks(datanode, size);
|
||||
}
|
||||
|
||||
@Override // NamenodeProtocol
|
||||
|
@ -41,14 +41,8 @@ public interface Namesystem extends RwLock, SafeMode {
|
||||
/** @return the block pool ID */
|
||||
String getBlockPoolId();
|
||||
|
||||
boolean isInStandbyState();
|
||||
|
||||
boolean isGenStampInFuture(Block block);
|
||||
|
||||
BlockCollection getBlockCollection(long id);
|
||||
|
||||
void checkOperation(OperationCategory read) throws StandbyException;
|
||||
|
||||
void startSecretManagerIfNecessary();
|
||||
|
||||
/**
|
||||
@ -67,11 +61,6 @@ ErasureCodingPolicy getErasureCodingPolicyForPath(String src)
|
||||
|
||||
HAContext getHAContext();
|
||||
|
||||
/**
|
||||
* @return true if the HA is enabled else false
|
||||
*/
|
||||
boolean isHaEnabled();
|
||||
|
||||
/**
|
||||
* @return Whether the namenode is transitioning to active state and is in the
|
||||
* middle of the starting active services.
|
||||
|
@ -130,7 +130,7 @@ public void setupMockCluster() throws IOException {
|
||||
Mockito.doReturn(true).when(fsn).hasWriteLock();
|
||||
Mockito.doReturn(true).when(fsn).hasReadLock();
|
||||
Mockito.doReturn(true).when(fsn).isRunning();
|
||||
bm = new BlockManager(fsn, conf);
|
||||
bm = new BlockManager(fsn, false, conf);
|
||||
final String[] racks = {
|
||||
"/rackA",
|
||||
"/rackA",
|
||||
|
@ -94,16 +94,16 @@ public void setupMockCluster() throws IOException {
|
||||
doReturn(true).when(fsn).hasWriteLock();
|
||||
doReturn(true).when(fsn).hasReadLock();
|
||||
doReturn(true).when(fsn).isRunning();
|
||||
doReturn(true).when(fsn).isGenStampInFuture(any(Block.class));
|
||||
NameNode.initMetrics(conf, NamenodeRole.NAMENODE);
|
||||
|
||||
bm = spy(new BlockManager(fsn, conf));
|
||||
bm = spy(new BlockManager(fsn, false, conf));
|
||||
doReturn(true).when(bm).isGenStampInFuture(any(Block.class));
|
||||
dn = spy(bm.getDatanodeManager());
|
||||
Whitebox.setInternalState(bm, "datanodeManager", dn);
|
||||
// the datanode threshold is always met
|
||||
when(dn.getNumLiveDataNodes()).thenReturn(DATANODE_NUM);
|
||||
|
||||
bmSafeMode = new BlockManagerSafeMode(bm, fsn, conf);
|
||||
bmSafeMode = new BlockManagerSafeMode(bm, fsn, false, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1315,7 +1315,7 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
|
||||
FSNamesystem mockNS = mock(FSNamesystem.class);
|
||||
when(mockNS.hasWriteLock()).thenReturn(true);
|
||||
when(mockNS.hasReadLock()).thenReturn(true);
|
||||
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
|
||||
BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration());
|
||||
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
|
||||
|
||||
BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
|
||||
@ -1365,7 +1365,7 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
|
||||
Namesystem mockNS = mock(Namesystem.class);
|
||||
when(mockNS.hasWriteLock()).thenReturn(true);
|
||||
|
||||
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
|
||||
BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration());
|
||||
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
|
||||
|
||||
long blkID1 = ThreadLocalRandom.current().nextLong();
|
||||
@ -1437,7 +1437,7 @@ public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
|
||||
Namesystem mockNS = mock(Namesystem.class);
|
||||
when(mockNS.hasReadLock()).thenReturn(true);
|
||||
|
||||
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
|
||||
BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration());
|
||||
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
|
||||
|
||||
BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
|
||||
|
@ -82,8 +82,8 @@ public void setup() throws Exception {
|
||||
cluster.waitActive();
|
||||
|
||||
fs = cluster.getFileSystem();
|
||||
blockGrpIdGenerator = cluster.getNamesystem().getBlockIdManager()
|
||||
.getBlockGroupIdGenerator();
|
||||
blockGrpIdGenerator = cluster.getNamesystem().getBlockManager()
|
||||
.getBlockIdManager().getBlockGroupIdGenerator();
|
||||
fs.mkdirs(ecDir);
|
||||
cluster.getFileSystem().getClient()
|
||||
.setErasureCodingPolicy("/ecDir", null);
|
||||
@ -179,10 +179,10 @@ public void testTriggerBlockGroupIdCollisionWithLegacyBlockId()
|
||||
// collision during blockGroup Id generation
|
||||
FSNamesystem fsn = cluster.getNamesystem();
|
||||
// Replace SequentialBlockIdGenerator with a spy
|
||||
SequentialBlockIdGenerator blockIdGenerator = spy(fsn.getBlockIdManager()
|
||||
.getBlockIdGenerator());
|
||||
Whitebox.setInternalState(fsn.getBlockIdManager(), "blockIdGenerator",
|
||||
blockIdGenerator);
|
||||
SequentialBlockIdGenerator blockIdGenerator = spy(fsn.getBlockManager()
|
||||
.getBlockIdManager().getBlockIdGenerator());
|
||||
Whitebox.setInternalState(fsn.getBlockManager().getBlockIdManager(),
|
||||
"blockIdGenerator", blockIdGenerator);
|
||||
SequentialBlockIdGenerator spySequentialBlockIdGenerator = new SequentialBlockIdGenerator(
|
||||
null) {
|
||||
@Override
|
||||
|
@ -116,8 +116,8 @@ public void testTriggerBlockIdCollision() throws IOException {
|
||||
|
||||
// Rewind the block ID counter in the name system object. This will result
|
||||
// in block ID collisions when we try to allocate new blocks.
|
||||
SequentialBlockIdGenerator blockIdGenerator = fsn.getBlockIdManager()
|
||||
.getBlockIdGenerator();
|
||||
SequentialBlockIdGenerator blockIdGenerator = fsn.getBlockManager()
|
||||
.getBlockIdManager().getBlockIdGenerator();
|
||||
blockIdGenerator.setCurrentValue(blockIdGenerator.getCurrentValue() - 5);
|
||||
|
||||
// Trigger collisions by creating a new file.
|
||||
|
@ -1020,7 +1020,7 @@ public void testTruncateRecovery() throws IOException {
|
||||
assertThat(truncateBlock.getNumBytes(),
|
||||
is(oldBlock.getNumBytes()));
|
||||
assertThat(truncateBlock.getGenerationStamp(),
|
||||
is(fsn.getBlockIdManager().getGenerationStampV2()));
|
||||
is(fsn.getBlockManager().getBlockIdManager().getGenerationStampV2()));
|
||||
assertThat(file.getLastBlock().getBlockUCState(),
|
||||
is(HdfsServerConstants.BlockUCState.UNDER_RECOVERY));
|
||||
long blockRecoveryId = file.getLastBlock().getUnderConstructionFeature()
|
||||
@ -1054,7 +1054,7 @@ public void testTruncateRecovery() throws IOException {
|
||||
assertThat(truncateBlock.getNumBytes() < oldBlock.getNumBytes(),
|
||||
is(true));
|
||||
assertThat(truncateBlock.getGenerationStamp(),
|
||||
is(fsn.getBlockIdManager().getGenerationStampV2()));
|
||||
is(fsn.getBlockManager().getBlockIdManager().getGenerationStampV2()));
|
||||
assertThat(file.getLastBlock().getBlockUCState(),
|
||||
is(HdfsServerConstants.BlockUCState.UNDER_RECOVERY));
|
||||
long blockRecoveryId = file.getLastBlock().getUnderConstructionFeature()
|
||||
|
@ -517,8 +517,8 @@ public void testCancelSaveNamespace() throws Exception {
|
||||
FSNamesystem spyFsn = spy(fsn);
|
||||
final FSNamesystem finalFsn = spyFsn;
|
||||
DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
|
||||
BlockIdManager bid = spy(spyFsn.getBlockIdManager());
|
||||
Whitebox.setInternalState(finalFsn, "blockIdManager", bid);
|
||||
BlockIdManager bid = spy(spyFsn.getBlockManager().getBlockIdManager());
|
||||
Whitebox.setInternalState(finalFsn.getBlockManager(), "blockIdManager", bid);
|
||||
doAnswer(delayer).when(bid).getGenerationStampV2();
|
||||
|
||||
ExecutorService pool = Executors.newFixedThreadPool(2);
|
||||
|
Loading…
Reference in New Issue
Block a user