diff --git a/hdfs/CHANGES.txt b/hdfs/CHANGES.txt index 5cccebf91d..6e98189135 100644 --- a/hdfs/CHANGES.txt +++ b/hdfs/CHANGES.txt @@ -543,6 +543,9 @@ Trunk (unreleased changes) HDFS-2111. Add tests for ensuring that the DN will start with a few bad data directories. (Harsh J Chouraria via todd) + HDFS-2134. Move DecommissionManager to the blockmanagement package. + (szetszwo) + OPTIMIZATIONS HDFS-1458. Improve checkpoint performance by avoiding unnecessary image diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 2510245ba9..25101107ec 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -99,6 +99,8 @@ public long getExcessBlocksCount() { */ public final BlocksMap blocksMap; + private final DatanodeManager datanodeManager; + // // Store blocks-->datanodedescriptor(s) map of corrupt replicas // @@ -164,6 +166,7 @@ public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException { DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); setConfigurationParameters(conf); blocksMap = new BlocksMap(capacity, DEFAULT_MAP_LOAD_FACTOR); + datanodeManager = new DatanodeManager(fsn); } void setConfigurationParameters(Configuration conf) throws IOException { @@ -207,13 +210,15 @@ void setConfigurationParameters(Configuration conf) throws IOException { FSNamesystem.LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks); } - public void activate() { + public void activate(Configuration conf) { pendingReplications.start(); + datanodeManager.activate(conf); } public void close() { if (pendingReplications != null) pendingReplications.stop(); blocksMap.close(); + datanodeManager.close(); } public void metaSave(PrintWriter out) { diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java new file mode 100644 index 0000000000..bc33e99056 --- /dev/null +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.util.Daemon; + +/** + * Manage datanodes, include decommission and other activities. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class DatanodeManager { + static final Log LOG = LogFactory.getLog(DatanodeManager.class); + + final FSNamesystem namesystem; + + DatanodeManager(final FSNamesystem namesystem) { + this.namesystem = namesystem; + } + + private Daemon decommissionthread = null; + + void activate(final Configuration conf) { + this.decommissionthread = new Daemon(new DecommissionManager(namesystem).new Monitor( + conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT), + conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY, + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT))); + decommissionthread.start(); + } + + void close() { + if (decommissionthread != null) decommissionthread.interrupt(); + } +} diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java similarity index 90% rename from hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java rename to hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index 0c7c4ca1ff..f74e1cbba7 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/DecommissionManager.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@ -15,18 +15,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.namenode; +package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.util.CyclicIteration; /** * Manage node decommissioning. */ +@InterfaceAudience.Private +@InterfaceStability.Evolving class DecommissionManager { static final Log LOG = LogFactory.getLog(DecommissionManager.class); @@ -56,6 +60,7 @@ class Monitor implements Runnable { * Check decommission status of numNodesPerCheck nodes * for every recheckInterval milliseconds. */ + @Override public void run() { for(; fsnamesystem.isRunning(); ) { fsnamesystem.writeLock(); diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index f1e887f477..55b9d0f4da 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -266,7 +266,7 @@ private static final void logAuditEvent(UserGroupInformation ugi, *

* Mapping: StorageID -> DatanodeDescriptor */ - NavigableMap datanodeMap = + public final NavigableMap datanodeMap = new TreeMap(); Random r = new Random(); @@ -324,7 +324,6 @@ private static final void logAuditEvent(UserGroupInformation ugi, private DNSToSwitchMapping dnsToSwitchMapping; private HostsFileReader hostsReader; - private Daemon dnthread = null; private long maxFsObjects = 0; // maximum number of fs objects @@ -404,7 +403,7 @@ void activateSecretManager() throws IOException { */ void activate(Configuration conf) throws IOException { setBlockTotal(); - blockManager.activate(); + blockManager.activate(conf); this.hbthread = new Daemon(new HeartbeatMonitor()); this.lmthread = new Daemon(leaseManager.new Monitor()); this.replthread = new Daemon(new ReplicationMonitor()); @@ -415,13 +414,6 @@ void activate(Configuration conf) throws IOException { this.nnrmthread = new Daemon(new NameNodeResourceMonitor()); nnrmthread.start(); - this.dnthread = new Daemon(new DecommissionManager(this).new Monitor( - conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, - DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT), - conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY, - DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT))); - dnthread.start(); - this.dnsToSwitchMapping = ReflectionUtils.newInstance( conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, ScriptBasedMapping.class, @@ -636,7 +628,6 @@ public void close() { if (blockManager != null) blockManager.close(); if (hbthread != null) hbthread.interrupt(); if (replthread != null) replthread.interrupt(); - if (dnthread != null) dnthread.interrupt(); if (smmthread != null) smmthread.interrupt(); if (dtSecretManager != null) dtSecretManager.stopThreads(); if (nnrmthread != null) nnrmthread.interrupt(); @@ -661,7 +652,7 @@ public void close() { } /** Is this name system running? */ - boolean isRunning() { + public boolean isRunning() { return fsRunning; } @@ -3988,7 +3979,7 @@ short adjustReplication(short replication) { * Change, if appropriate, the admin state of a datanode to * decommission completed. Return true if decommission is complete. */ - boolean checkDecommissionStateInternal(DatanodeDescriptor node) { + public boolean checkDecommissionStateInternal(DatanodeDescriptor node) { assert hasWriteLock(); // // Check to see if all blocks in this decommissioned