HDFS-2134. Move DecommissionManager to the blockmanagement package.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1145393 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-07-11 23:06:36 +00:00
parent e8eed98feb
commit b9189f7b15
5 changed files with 77 additions and 16 deletions

View File

@ -543,6 +543,9 @@ Trunk (unreleased changes)
HDFS-2111. Add tests for ensuring that the DN will start with a few bad HDFS-2111. Add tests for ensuring that the DN will start with a few bad
data directories. (Harsh J Chouraria via todd) data directories. (Harsh J Chouraria via todd)
HDFS-2134. Move DecommissionManager to the blockmanagement package.
(szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -99,6 +99,8 @@ public long getExcessBlocksCount() {
*/ */
public final BlocksMap blocksMap; public final BlocksMap blocksMap;
private final DatanodeManager datanodeManager;
// //
// Store blocks-->datanodedescriptor(s) map of corrupt replicas // Store blocks-->datanodedescriptor(s) map of corrupt replicas
// //
@ -164,6 +166,7 @@ public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
setConfigurationParameters(conf); setConfigurationParameters(conf);
blocksMap = new BlocksMap(capacity, DEFAULT_MAP_LOAD_FACTOR); blocksMap = new BlocksMap(capacity, DEFAULT_MAP_LOAD_FACTOR);
datanodeManager = new DatanodeManager(fsn);
} }
void setConfigurationParameters(Configuration conf) throws IOException { void setConfigurationParameters(Configuration conf) throws IOException {
@ -207,13 +210,15 @@ void setConfigurationParameters(Configuration conf) throws IOException {
FSNamesystem.LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks); FSNamesystem.LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
} }
public void activate() { public void activate(Configuration conf) {
pendingReplications.start(); pendingReplications.start();
datanodeManager.activate(conf);
} }
public void close() { public void close() {
if (pendingReplications != null) pendingReplications.stop(); if (pendingReplications != null) pendingReplications.stop();
blocksMap.close(); blocksMap.close();
datanodeManager.close();
} }
public void metaSave(PrintWriter out) { public void metaSave(PrintWriter out) {

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.util.Daemon;
/**
* Manage datanodes, include decommission and other activities.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class DatanodeManager {
static final Log LOG = LogFactory.getLog(DatanodeManager.class);
final FSNamesystem namesystem;
DatanodeManager(final FSNamesystem namesystem) {
this.namesystem = namesystem;
}
private Daemon decommissionthread = null;
void activate(final Configuration conf) {
this.decommissionthread = new Daemon(new DecommissionManager(namesystem).new Monitor(
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT),
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT)));
decommissionthread.start();
}
void close() {
if (decommissionthread != null) decommissionthread.interrupt();
}
}

View File

@ -15,18 +15,22 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Map; import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.util.CyclicIteration; import org.apache.hadoop.util.CyclicIteration;
/** /**
* Manage node decommissioning. * Manage node decommissioning.
*/ */
@InterfaceAudience.Private
@InterfaceStability.Evolving
class DecommissionManager { class DecommissionManager {
static final Log LOG = LogFactory.getLog(DecommissionManager.class); static final Log LOG = LogFactory.getLog(DecommissionManager.class);
@ -56,6 +60,7 @@ class Monitor implements Runnable {
* Check decommission status of numNodesPerCheck nodes * Check decommission status of numNodesPerCheck nodes
* for every recheckInterval milliseconds. * for every recheckInterval milliseconds.
*/ */
@Override
public void run() { public void run() {
for(; fsnamesystem.isRunning(); ) { for(; fsnamesystem.isRunning(); ) {
fsnamesystem.writeLock(); fsnamesystem.writeLock();

View File

@ -266,7 +266,7 @@ private static final void logAuditEvent(UserGroupInformation ugi,
* <p> * <p>
* Mapping: StorageID -> DatanodeDescriptor * Mapping: StorageID -> DatanodeDescriptor
*/ */
NavigableMap<String, DatanodeDescriptor> datanodeMap = public final NavigableMap<String, DatanodeDescriptor> datanodeMap =
new TreeMap<String, DatanodeDescriptor>(); new TreeMap<String, DatanodeDescriptor>();
Random r = new Random(); Random r = new Random();
@ -324,7 +324,6 @@ private static final void logAuditEvent(UserGroupInformation ugi,
private DNSToSwitchMapping dnsToSwitchMapping; private DNSToSwitchMapping dnsToSwitchMapping;
private HostsFileReader hostsReader; private HostsFileReader hostsReader;
private Daemon dnthread = null;
private long maxFsObjects = 0; // maximum number of fs objects private long maxFsObjects = 0; // maximum number of fs objects
@ -404,7 +403,7 @@ void activateSecretManager() throws IOException {
*/ */
void activate(Configuration conf) throws IOException { void activate(Configuration conf) throws IOException {
setBlockTotal(); setBlockTotal();
blockManager.activate(); blockManager.activate(conf);
this.hbthread = new Daemon(new HeartbeatMonitor()); this.hbthread = new Daemon(new HeartbeatMonitor());
this.lmthread = new Daemon(leaseManager.new Monitor()); this.lmthread = new Daemon(leaseManager.new Monitor());
this.replthread = new Daemon(new ReplicationMonitor()); this.replthread = new Daemon(new ReplicationMonitor());
@ -415,13 +414,6 @@ void activate(Configuration conf) throws IOException {
this.nnrmthread = new Daemon(new NameNodeResourceMonitor()); this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
nnrmthread.start(); nnrmthread.start();
this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT),
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT)));
dnthread.start();
this.dnsToSwitchMapping = ReflectionUtils.newInstance( this.dnsToSwitchMapping = ReflectionUtils.newInstance(
conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
ScriptBasedMapping.class, ScriptBasedMapping.class,
@ -636,7 +628,6 @@ public void close() {
if (blockManager != null) blockManager.close(); if (blockManager != null) blockManager.close();
if (hbthread != null) hbthread.interrupt(); if (hbthread != null) hbthread.interrupt();
if (replthread != null) replthread.interrupt(); if (replthread != null) replthread.interrupt();
if (dnthread != null) dnthread.interrupt();
if (smmthread != null) smmthread.interrupt(); if (smmthread != null) smmthread.interrupt();
if (dtSecretManager != null) dtSecretManager.stopThreads(); if (dtSecretManager != null) dtSecretManager.stopThreads();
if (nnrmthread != null) nnrmthread.interrupt(); if (nnrmthread != null) nnrmthread.interrupt();
@ -661,7 +652,7 @@ public void close() {
} }
/** Is this name system running? */ /** Is this name system running? */
boolean isRunning() { public boolean isRunning() {
return fsRunning; return fsRunning;
} }
@ -3988,7 +3979,7 @@ short adjustReplication(short replication) {
* Change, if appropriate, the admin state of a datanode to * Change, if appropriate, the admin state of a datanode to
* decommission completed. Return true if decommission is complete. * decommission completed. Return true if decommission is complete.
*/ */
boolean checkDecommissionStateInternal(DatanodeDescriptor node) { public boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
assert hasWriteLock(); assert hasWriteLock();
// //
// Check to see if all blocks in this decommissioned // Check to see if all blocks in this decommissioned