HDFS-2493. Remove reference to FSNamesystem in blockmanagement classes.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1190491 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cbdb07f4ca
commit
9692cfc9ae
@ -825,6 +825,9 @@ Release 0.23.0 - Unreleased
|
||||
|
||||
HDFS-2507. Allow saveNamespace operations to be canceled. (todd)
|
||||
|
||||
HDFS-2493. Remove reference to FSNamesystem in blockmanagement classes.
|
||||
(szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
|
||||
|
@ -53,7 +53,7 @@
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.common.Util;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
|
||||
@ -72,8 +72,6 @@
|
||||
|
||||
/**
|
||||
* Keeps information related to the blocks stored in the Hadoop cluster.
|
||||
* This class is a helper class for {@link FSNamesystem} and requires several
|
||||
* methods to be called with lock held on {@link FSNamesystem}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class BlockManager {
|
||||
@ -176,15 +174,16 @@ public long getExcessBlocksCount() {
|
||||
/** for block replicas placement */
|
||||
private BlockPlacementPolicy blockplacement;
|
||||
|
||||
public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
|
||||
namesystem = fsn;
|
||||
datanodeManager = new DatanodeManager(this, fsn, conf);
|
||||
public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
|
||||
final Configuration conf) throws IOException {
|
||||
this.namesystem = namesystem;
|
||||
datanodeManager = new DatanodeManager(this, namesystem, conf);
|
||||
heartbeatManager = datanodeManager.getHeartbeatManager();
|
||||
invalidateBlocks = new InvalidateBlocks(datanodeManager);
|
||||
|
||||
blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
|
||||
blockplacement = BlockPlacementPolicy.getInstance(
|
||||
conf, fsn, datanodeManager.getNetworkTopology());
|
||||
conf, stats, datanodeManager.getNetworkTopology());
|
||||
pendingReplications = new PendingReplicationBlocks(conf.getInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
|
||||
@ -2580,7 +2579,7 @@ int computeDatanodeWork() throws IOException {
|
||||
|
||||
workFound = this.computeReplicationWork(blocksToProcess);
|
||||
|
||||
// Update FSNamesystemMetrics counters
|
||||
// Update counters
|
||||
namesystem.writeLock();
|
||||
try {
|
||||
this.updateState();
|
||||
|
@ -50,8 +50,8 @@
|
||||
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
|
||||
import org.apache.hadoop.hdfs.server.common.Util;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
|
||||
@ -79,7 +79,7 @@
|
||||
public class DatanodeManager {
|
||||
static final Log LOG = LogFactory.getLog(DatanodeManager.class);
|
||||
|
||||
private final FSNamesystem namesystem;
|
||||
private final Namesystem namesystem;
|
||||
private final BlockManager blockManager;
|
||||
|
||||
private final HeartbeatManager heartbeatManager;
|
||||
@ -125,12 +125,12 @@ public class DatanodeManager {
|
||||
final int blockInvalidateLimit;
|
||||
|
||||
DatanodeManager(final BlockManager blockManager,
|
||||
final FSNamesystem namesystem, final Configuration conf
|
||||
final Namesystem namesystem, final Configuration conf
|
||||
) throws IOException {
|
||||
this.namesystem = namesystem;
|
||||
this.blockManager = blockManager;
|
||||
|
||||
this.heartbeatManager = new HeartbeatManager(namesystem, conf);
|
||||
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
|
||||
|
||||
this.hostsReader = new HostsFileReader(
|
||||
conf.get(DFSConfigKeys.DFS_HOSTS, ""),
|
||||
@ -164,7 +164,8 @@ public class DatanodeManager {
|
||||
private Daemon decommissionthread = null;
|
||||
|
||||
void activate(final Configuration conf) {
|
||||
this.decommissionthread = new Daemon(new DecommissionManager(namesystem).new Monitor(
|
||||
final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
|
||||
this.decommissionthread = new Daemon(dm.new Monitor(
|
||||
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT),
|
||||
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY,
|
||||
|
@ -56,4 +56,7 @@ public interface DatanodeStatistics {
|
||||
* The block related entries are set to -1.
|
||||
*/
|
||||
public long[] getStats();
|
||||
|
||||
/** @return the expired heartbeats */
|
||||
public int getExpiredHeartbeats();
|
||||
}
|
@ -23,7 +23,7 @@
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
||||
|
||||
/**
|
||||
* Manage node decommissioning.
|
||||
@ -33,10 +33,13 @@
|
||||
class DecommissionManager {
|
||||
static final Log LOG = LogFactory.getLog(DecommissionManager.class);
|
||||
|
||||
private final FSNamesystem fsnamesystem;
|
||||
private final Namesystem namesystem;
|
||||
private final BlockManager blockmanager;
|
||||
|
||||
DecommissionManager(final FSNamesystem namesystem) {
|
||||
this.fsnamesystem = namesystem;
|
||||
DecommissionManager(final Namesystem namesystem,
|
||||
final BlockManager blockmanager) {
|
||||
this.namesystem = namesystem;
|
||||
this.blockmanager = blockmanager;
|
||||
}
|
||||
|
||||
/** Periodically check decommission status. */
|
||||
@ -61,12 +64,12 @@ class Monitor implements Runnable {
|
||||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
for(; fsnamesystem.isRunning(); ) {
|
||||
fsnamesystem.writeLock();
|
||||
for(; namesystem.isRunning(); ) {
|
||||
namesystem.writeLock();
|
||||
try {
|
||||
check();
|
||||
} finally {
|
||||
fsnamesystem.writeUnlock();
|
||||
namesystem.writeUnlock();
|
||||
}
|
||||
|
||||
try {
|
||||
@ -78,7 +81,7 @@ public void run() {
|
||||
}
|
||||
|
||||
private void check() {
|
||||
final DatanodeManager dm = fsnamesystem.getBlockManager().getDatanodeManager();
|
||||
final DatanodeManager dm = blockmanager.getDatanodeManager();
|
||||
int count = 0;
|
||||
for(Map.Entry<String, DatanodeDescriptor> entry
|
||||
: dm.getDatanodeCyclicIteration(firstkey)) {
|
||||
|
@ -27,7 +27,7 @@
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.server.common.Util;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
|
||||
/**
|
||||
@ -55,14 +55,17 @@ class HeartbeatManager implements DatanodeStatistics {
|
||||
/** Heartbeat monitor thread */
|
||||
private final Daemon heartbeatThread = new Daemon(new Monitor());
|
||||
|
||||
final FSNamesystem namesystem;
|
||||
final Namesystem namesystem;
|
||||
final BlockManager blockManager;
|
||||
|
||||
HeartbeatManager(final FSNamesystem namesystem, final Configuration conf) {
|
||||
HeartbeatManager(final Namesystem namesystem, final BlockManager blockManager,
|
||||
final Configuration conf) {
|
||||
this.heartbeatRecheckInterval = conf.getInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
|
||||
|
||||
this.namesystem = namesystem;
|
||||
this.blockManager = blockManager;
|
||||
}
|
||||
|
||||
void activate(Configuration conf) {
|
||||
@ -136,6 +139,11 @@ public synchronized long[] getStats() {
|
||||
getBlockPoolUsed()};
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int getExpiredHeartbeats() {
|
||||
return stats.expiredHeartbeats;
|
||||
}
|
||||
|
||||
synchronized void register(final DatanodeDescriptor d) {
|
||||
if (!datanodes.contains(d)) {
|
||||
addDatanode(d);
|
||||
@ -191,7 +199,7 @@ synchronized void stopDecommission(final DatanodeDescriptor node) {
|
||||
* effect causes more datanodes to be declared dead.
|
||||
*/
|
||||
void heartbeatCheck() {
|
||||
final DatanodeManager dm = namesystem.getBlockManager().getDatanodeManager();
|
||||
final DatanodeManager dm = blockManager.getDatanodeManager();
|
||||
// It's OK to check safe mode w/o taking the lock here, we re-check
|
||||
// for safe mode after taking the lock before removing a datanode.
|
||||
if (namesystem.isInSafeMode()) {
|
||||
@ -204,7 +212,7 @@ void heartbeatCheck() {
|
||||
synchronized(this) {
|
||||
for (DatanodeDescriptor d : datanodes) {
|
||||
if (dm.isDatanodeDead(d)) {
|
||||
namesystem.incrExpiredHeartbeats();
|
||||
stats.incrExpiredHeartbeats();
|
||||
dead = d;
|
||||
break;
|
||||
}
|
||||
@ -244,8 +252,7 @@ public void run() {
|
||||
heartbeatCheck();
|
||||
lastHeartbeatCheck = now;
|
||||
}
|
||||
if (namesystem.getBlockManager().shouldUpdateBlockKey(
|
||||
now - lastBlockKeyUpdate)) {
|
||||
if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) {
|
||||
synchronized(HeartbeatManager.this) {
|
||||
for(DatanodeDescriptor d : datanodes) {
|
||||
d.needKeyUpdate = true;
|
||||
@ -274,6 +281,8 @@ private static class Stats {
|
||||
private long blockPoolUsed = 0L;
|
||||
private int xceiverCount = 0;
|
||||
|
||||
private int expiredHeartbeats = 0;
|
||||
|
||||
private void add(final DatanodeDescriptor node) {
|
||||
capacityUsed += node.getDfsUsed();
|
||||
blockPoolUsed += node.getBlockPoolUsed();
|
||||
@ -297,5 +306,10 @@ private void subtract(final DatanodeDescriptor node) {
|
||||
capacityTotal -= node.getDfsUsed();
|
||||
}
|
||||
}
|
||||
|
||||
/** Increment expired heartbeat counter. */
|
||||
private void incrExpiredHeartbeats() {
|
||||
expiredHeartbeats++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,45 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_KEY;
|
||||
import static org.apache.hadoop.hdfs.server.common.Util.now;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
@ -68,7 +107,6 @@
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||
@ -119,7 +157,6 @@
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
||||
import org.apache.hadoop.metrics2.util.MBeans;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.net.Node;
|
||||
@ -204,8 +241,6 @@ private static final void logAuditEvent(UserGroupInformation ugi,
|
||||
private UserGroupInformation fsOwner;
|
||||
private String supergroup;
|
||||
private PermissionStatus defaultPermission;
|
||||
// FSNamesystemMetrics counter variables
|
||||
@Metric private MutableCounterInt expiredHeartbeats;
|
||||
|
||||
// Scan interval is not configurable.
|
||||
private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
|
||||
@ -313,7 +348,7 @@ private void initialize(Configuration conf, FSImage fsImage)
|
||||
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
|
||||
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
|
||||
this.systemStart = now();
|
||||
this.blockManager = new BlockManager(this, conf);
|
||||
this.blockManager = new BlockManager(this, this, conf);
|
||||
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
|
||||
this.fsLock = new ReentrantReadWriteLock(true); // fair locking
|
||||
setConfigurationParameters(conf);
|
||||
@ -2618,9 +2653,9 @@ public long getMissingBlocksCount() {
|
||||
return blockManager.getMissingBlocksCount();
|
||||
}
|
||||
|
||||
/** Increment expired heartbeat counter. */
|
||||
public void incrExpiredHeartbeats() {
|
||||
expiredHeartbeats.incr();
|
||||
@Metric({"ExpiredHeartbeats", "Number of expired heartbeats"})
|
||||
public int getExpiredHeartbeats() {
|
||||
return datanodeStatistics.getExpiredHeartbeats();
|
||||
}
|
||||
|
||||
/** @see ClientProtocol#getStats() */
|
||||
|
@ -77,7 +77,7 @@ public void testDatanodeReport() throws Exception {
|
||||
NUM_OF_DATANODES);
|
||||
|
||||
Thread.sleep(5000);
|
||||
assertCounter("ExpiredHeartbeats", 1, getMetrics("FSNamesystem"));
|
||||
assertGauge("ExpiredHeartbeats", 1, getMetrics("FSNamesystem"));
|
||||
}finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
@ -80,7 +80,7 @@ public void setupMockCluster() throws IOException {
|
||||
"need to set a dummy value here so it assumes a multi-rack cluster");
|
||||
fsn = Mockito.mock(FSNamesystem.class);
|
||||
Mockito.doReturn(true).when(fsn).hasWriteLock();
|
||||
bm = new BlockManager(fsn, conf);
|
||||
bm = new BlockManager(fsn, fsn, conf);
|
||||
}
|
||||
|
||||
private void addNodes(Iterable<DatanodeDescriptor> nodesToAdd) {
|
||||
|
Loading…
Reference in New Issue
Block a user