HDFS-7375. Move FSClusterStats to o.a.h.h.hdfs.server.blockmanagement. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2014-11-11 18:22:40 -08:00
parent 163bb55067
commit 46f6f9d60d
13 changed files with 183 additions and 152 deletions

View File

@ -353,6 +353,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7381. Decouple the management of block id and gen stamps from
FSNamesystem. (wheat9)
HDFS-7375. Move FSClusterStats to o.a.h.h.hdfs.server.blockmanagement.
(wheat9)
OPTIMIZATIONS
BUG FIXES

View File

@ -66,7 +66,6 @@
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@ -111,7 +110,7 @@ public class BlockManager {
private final DatanodeManager datanodeManager;
private final HeartbeatManager heartbeatManager;
private final BlockTokenSecretManager blockTokenSecretManager;
private final PendingDataNodeMessages pendingDNMessages =
new PendingDataNodeMessages();
@ -264,9 +263,9 @@ public int getPendingDataNodeMessageCount() {
/** Check whether name system is running before terminating */
private boolean checkNSRunning = true;
public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
final Configuration conf) throws IOException {
public BlockManager(final Namesystem namesystem, final Configuration conf)
throws IOException {
this.namesystem = namesystem;
datanodeManager = new DatanodeManager(this, namesystem, conf);
heartbeatManager = datanodeManager.getHeartbeatManager();
@ -281,8 +280,9 @@ public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
blocksMap = new BlocksMap(
LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
blockplacement = BlockPlacementPolicy.getInstance(
conf, stats, datanodeManager.getNetworkTopology(),
datanodeManager.getHost2DatanodeMap());
conf, datanodeManager.getFSClusterStats(),
datanodeManager.getNetworkTopology(),
datanodeManager.getHost2DatanodeMap());
storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite();
pendingReplications = new PendingReplicationBlocks(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,

View File

@ -33,7 +33,6 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.ReflectionUtils;

View File

@ -31,7 +31,6 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;

View File

@ -23,7 +23,6 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
import org.apache.hadoop.net.Node;

View File

@ -66,6 +66,7 @@ public class DatanodeManager {
private final Namesystem namesystem;
private final BlockManager blockManager;
private final HeartbeatManager heartbeatManager;
private final FSClusterStats fsClusterStats;
private Daemon decommissionthread = null;
/**
@ -169,7 +170,7 @@ public class DatanodeManager {
* directives that we've already sent.
*/
private final long timeBetweenResendingCachingDirectivesMs;
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
@ -178,6 +179,7 @@ public class DatanodeManager {
networktopology = NetworkTopology.getInstance(conf);
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
this.fsClusterStats = newFSClusterStats();
this.defaultXferPort = NetUtils.createSocketAddr(
conf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
@ -329,6 +331,11 @@ HeartbeatManager getHeartbeatManager() {
return heartbeatManager;
}
@VisibleForTesting
public FSClusterStats getFSClusterStats() {
return fsClusterStats;
}
/** @return the datanode statistics. */
public DatanodeStatistics getDatanodeStatistics() {
return heartbeatManager;
@ -1595,5 +1602,36 @@ public void clearPendingCachingCommands() {
public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
this.shouldSendCachingCommands = shouldSendCachingCommands;
}
FSClusterStats newFSClusterStats() {
return new FSClusterStats() {
@Override
public int getTotalLoad() {
return heartbeatManager.getXceiverCount();
}
@Override
public boolean isAvoidingStaleDataNodesForWrite() {
return shouldAvoidStaleDataNodesForWrite();
}
@Override
public int getNumDatanodesInService() {
return heartbeatManager.getNumDatanodesInService();
}
@Override
public double getInServiceXceiverAverage() {
double avgLoad = 0;
final int nodes = getNumDatanodesInService();
if (nodes != 0) {
final int xceivers = heartbeatManager
.getInServiceXceiverCount();
avgLoad = (double)xceivers/nodes;
}
return avgLoad;
}
};
}
}

View File

@ -15,12 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* This interface is used for retrieving the load related statistics of
/**
* This interface is used for retrieving the load related statistics of
* the cluster.
*/
@InterfaceAudience.Private
@ -28,17 +28,17 @@ public interface FSClusterStats {
/**
* an indication of the total load of the cluster.
*
*
* @return a count of the total number of block transfers and block
* writes that are currently occuring on the cluster.
*/
public int getTotalLoad();
/**
* Indicate whether or not the cluster is now avoiding
* Indicate whether or not the cluster is now avoiding
* to use stale DataNodes for writing.
*
* @return True if the cluster is currently avoiding using stale DataNodes
*
* @return True if the cluster is currently avoiding using stale DataNodes
* for writing targets, and false otherwise.
*/
public boolean isAvoidingStaleDataNodesForWrite();
@ -52,11 +52,9 @@ public interface FSClusterStats {
/**
* an indication of the average load of non-decommission(ing|ed) nodes
* eligible for block placement
*
*
* @return average of the in service number of block transfers and block
* writes that are currently occurring on the cluster.
*/
public double getInServiceXceiverAverage();
}

View File

@ -318,8 +318,8 @@
*/
@InterfaceAudience.Private
@Metrics(context="dfs")
public class FSNamesystem implements Namesystem, FSClusterStats,
FSNamesystemMBean, NameNodeMXBean {
public class FSNamesystem implements Namesystem, FSNamesystemMBean,
NameNodeMXBean {
public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
private static final ThreadLocal<StringBuilder> auditBuffer =
@ -765,7 +765,7 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
this.blockManager = new BlockManager(this, this, conf);
this.blockManager = new BlockManager(this, conf);
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.blockIdManager = new BlockIdManager(blockManager);
@ -7818,28 +7818,6 @@ public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
this.nnResourceChecker = nnResourceChecker;
}
@Override
public boolean isAvoidingStaleDataNodesForWrite() {
return this.blockManager.getDatanodeManager()
.shouldAvoidStaleDataNodesForWrite();
}
@Override // FSClusterStats
public int getNumDatanodesInService() {
return datanodeStatistics.getNumDatanodesInService();
}
@Override // for block placement strategy
public double getInServiceXceiverAverage() {
double avgLoad = 0;
final int nodes = getNumDatanodesInService();
if (nodes != 0) {
final int xceivers = datanodeStatistics.getInServiceXceiverCount();
avgLoad = (double)xceivers/nodes;
}
return avgLoad;
}
public SnapshotManager getSnapshotManager() {
return snapshotManager;
}

View File

@ -81,19 +81,17 @@ public class TestBlockManager {
private static final int NUM_TEST_ITERS = 30;
private static final int BLOCK_SIZE = 64*1024;
private Configuration conf;
private FSNamesystem fsn;
private BlockManager bm;
@Before
public void setupMockCluster() throws IOException {
conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
"need to set a dummy value here so it assumes a multi-rack cluster");
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "need to set a dummy value here so it assumes a multi-rack cluster");
fsn = Mockito.mock(FSNamesystem.class);
Mockito.doReturn(true).when(fsn).hasWriteLock();
bm = new BlockManager(fsn, fsn, conf);
bm = new BlockManager(fsn, conf);
final String[] racks = {
"/rackA",
"/rackA",

View File

@ -56,7 +56,6 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -1145,9 +1144,7 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
when(mockNS.hasWriteLock()).thenReturn(true);
FSClusterStats mockStats = mock(FSClusterStats.class);
BlockManager bm =
new BlockManager(mockNS, mockStats, new HdfsConfiguration());
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
Block block1 = new Block(random.nextLong());
@ -1193,9 +1190,7 @@ public void testAddStoredBlockDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
FSClusterStats mockStats = mock(FSClusterStats.class);
BlockManager bm =
new BlockManager(mockNS, mockStats, new HdfsConfiguration());
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
Block block1 = new Block(random.nextLong());
@ -1248,9 +1243,7 @@ public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
FSClusterStats mockStats = mock(FSClusterStats.class);
BlockManager bm =
new BlockManager(mockNS, mockStats, new HdfsConfiguration());
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
Block block1 = new Block(random.nextLong());

View File

@ -130,7 +130,8 @@ public void testChooseTargetWithDecomNodes() throws IOException {
final int load = 2 + 4 + 4;
FSNamesystem fsn = namenode.getNamesystem();
assertEquals((double)load/6, fsn.getInServiceXceiverAverage(), EPSILON);
assertEquals((double)load/6, dnManager.getFSClusterStats()
.getInServiceXceiverAverage(), EPSILON);
// Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
// returns false
@ -139,7 +140,8 @@ public void testChooseTargetWithDecomNodes() throws IOException {
dnManager.startDecommission(d);
d.setDecommissioned();
}
assertEquals((double)load/3, fsn.getInServiceXceiverAverage(), EPSILON);
assertEquals((double)load/3, dnManager.getFSClusterStats()
.getInServiceXceiverAverage(), EPSILON);
// update references of writer DN to update the de-commissioned state
List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();

View File

@ -17,18 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.util.Collection;
import java.util.Map;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@ -43,6 +32,18 @@
import org.junit.Test;
import org.mortbay.util.ajax.JSON;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Class for testing {@link NameNodeMXBean} implementation
*/
@ -62,14 +63,11 @@ public class TestNameNodeMXBean {
public void testNameNodeMXBeanInfo() throws Exception {
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
NativeIO.POSIX.getCacheManipulator().getMemlockLimit());
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
NativeIO.POSIX.getCacheManipulator().getMemlockLimit());
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
FSNamesystem fsn = cluster.getNameNode().namesystem;
@ -77,29 +75,6 @@ public void testNameNodeMXBeanInfo() throws Exception {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanName = new ObjectName(
"Hadoop:service=NameNode,name=NameNodeInfo");
// Define include file to generate deadNodes metrics
FileSystem localFileSys = FileSystem.getLocal(conf);
Path workingDir = localFileSys.getWorkingDirectory();
Path dir = new Path(workingDir,
"build/test/data/temp/TestNameNodeMXBean");
Path includeFile = new Path(dir, "include");
assertTrue(localFileSys.mkdirs(dir));
StringBuilder includeHosts = new StringBuilder();
for(DataNode dn : cluster.getDataNodes()) {
includeHosts.append(dn.getDisplayName()).append("\n");
}
DFSTestUtil.writeFile(localFileSys, includeFile, includeHosts.toString());
conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
fsn.getBlockManager().getDatanodeManager().refreshNodes(conf);
cluster.stopDataNode(0);
while (fsn.getNumDatanodesInService() != 2) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
}
// get attribute "ClusterId"
String clusterId = (String) mbs.getAttribute(mxbeanName, "ClusterId");
assertEquals(fsn.getClusterId(), clusterId);
@ -127,8 +102,7 @@ public void testNameNodeMXBeanInfo() throws Exception {
// get attribute percentremaining
Float percentremaining = (Float) (mbs.getAttribute(mxbeanName,
"PercentRemaining"));
assertEquals(fsn.getPercentRemaining(), percentremaining
.floatValue(), DELTA);
assertEquals(fsn.getPercentRemaining(), percentremaining, DELTA);
// get attribute Totalblocks
Long totalblocks = (Long) (mbs.getAttribute(mxbeanName, "TotalBlocks"));
assertEquals(fsn.getTotalBlocks(), totalblocks.longValue());
@ -151,15 +125,6 @@ public void testNameNodeMXBeanInfo() throws Exception {
String deadnodeinfo = (String) (mbs.getAttribute(mxbeanName,
"DeadNodes"));
assertEquals(fsn.getDeadNodes(), deadnodeinfo);
Map<String, Map<String, Object>> deadNodes =
(Map<String, Map<String, Object>>) JSON.parse(deadnodeinfo);
assertTrue(deadNodes.size() > 0);
for (Map<String, Object> deadNode : deadNodes.values()) {
assertTrue(deadNode.containsKey("lastContact"));
assertTrue(deadNode.containsKey("decommissioned"));
assertTrue(deadNode.containsKey("xferaddr"));
}
// get attribute NodeUsage
String nodeUsage = (String) (mbs.getAttribute(mxbeanName,
"NodeUsage"));
@ -233,4 +198,63 @@ public void testNameNodeMXBeanInfo() throws Exception {
}
}
}
@SuppressWarnings({ "unchecked" })
@Test
public void testLastContactTime() throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
FSNamesystem fsn = cluster.getNameNode().namesystem;
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanName = new ObjectName(
"Hadoop:service=NameNode,name=NameNodeInfo");
// Define include file to generate deadNodes metrics
FileSystem localFileSys = FileSystem.getLocal(conf);
Path workingDir = localFileSys.getWorkingDirectory();
Path dir = new Path(workingDir,
"build/test/data/temp/TestNameNodeMXBean");
Path includeFile = new Path(dir, "include");
assertTrue(localFileSys.mkdirs(dir));
StringBuilder includeHosts = new StringBuilder();
for(DataNode dn : cluster.getDataNodes()) {
includeHosts.append(dn.getDisplayName()).append("\n");
}
DFSTestUtil.writeFile(localFileSys, includeFile, includeHosts.toString());
conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
fsn.getBlockManager().getDatanodeManager().refreshNodes(conf);
cluster.stopDataNode(0);
while (fsn.getBlockManager().getDatanodeManager().getNumLiveDataNodes()
!= 2 ) {
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
// get attribute deadnodeinfo
String deadnodeinfo = (String) (mbs.getAttribute(mxbeanName,
"DeadNodes"));
assertEquals(fsn.getDeadNodes(), deadnodeinfo);
Map<String, Map<String, Object>> deadNodes =
(Map<String, Map<String, Object>>) JSON.parse(deadnodeinfo);
assertTrue(deadNodes.size() > 0);
for (Map<String, Object> deadNode : deadNodes.values()) {
assertTrue(deadNode.containsKey("lastContact"));
assertTrue(deadNode.containsKey("decommissioned"));
assertTrue(deadNode.containsKey("xferaddr"));
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -193,11 +193,7 @@ public void testXceiverCount() throws Exception {
int expectedTotalLoad = nodes; // xceiver server adds 1 to load
int expectedInServiceNodes = nodes;
int expectedInServiceLoad = nodes;
assertEquals(nodes, namesystem.getNumLiveDataNodes());
assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
namesystem.getInServiceXceiverAverage(), EPSILON);
checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
// shutdown half the nodes and force a heartbeat check to ensure
// counts are accurate
@ -209,7 +205,7 @@ public void testXceiverCount() throws Exception {
BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
expectedInServiceNodes--;
assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes());
assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
}
// restart the nodes to verify that counts are correct after
@ -219,11 +215,7 @@ public void testXceiverCount() throws Exception {
datanodes = cluster.getDataNodes();
expectedInServiceNodes = nodes;
assertEquals(nodes, datanodes.size());
assertEquals(nodes, namesystem.getNumLiveDataNodes());
assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
namesystem.getInServiceXceiverAverage(), EPSILON);
checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
// create streams and hsync to force datastreamers to start
DFSOutputStream[] streams = new DFSOutputStream[fileCount];
@ -239,12 +231,7 @@ public void testXceiverCount() throws Exception {
}
// force nodes to send load update
triggerHeartbeats(datanodes);
assertEquals(nodes, namesystem.getNumLiveDataNodes());
assertEquals(expectedInServiceNodes,
namesystem.getNumDatanodesInService());
assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
namesystem.getInServiceXceiverAverage(), EPSILON);
checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
// decomm a few nodes, substract their load from the expected load,
// trigger heartbeat to force load update
@ -256,12 +243,7 @@ public void testXceiverCount() throws Exception {
dnm.startDecommission(dnd);
DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
Thread.sleep(100);
assertEquals(nodes, namesystem.getNumLiveDataNodes());
assertEquals(expectedInServiceNodes,
namesystem.getNumDatanodesInService());
assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
namesystem.getInServiceXceiverAverage(), EPSILON);
checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
}
// check expected load while closing each stream. recalc expected
@ -289,12 +271,7 @@ public void testXceiverCount() throws Exception {
}
triggerHeartbeats(datanodes);
// verify node count and loads
assertEquals(nodes, namesystem.getNumLiveDataNodes());
assertEquals(expectedInServiceNodes,
namesystem.getNumDatanodesInService());
assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
namesystem.getInServiceXceiverAverage(), EPSILON);
checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
}
// shutdown each node, verify node counts based on decomm state
@ -310,26 +287,49 @@ public void testXceiverCount() throws Exception {
if (i >= fileRepl) {
expectedInServiceNodes--;
}
assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
// live nodes always report load of 1. no nodes is load 0
double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0;
assertEquals((double)expectedXceiverAvg,
namesystem.getInServiceXceiverAverage(), EPSILON);
getInServiceXceiverAverage(namesystem), EPSILON);
}
// final sanity check
assertEquals(0, namesystem.getNumLiveDataNodes());
assertEquals(0, namesystem.getNumDatanodesInService());
assertEquals(0.0, namesystem.getTotalLoad(), EPSILON);
assertEquals(0.0, namesystem.getInServiceXceiverAverage(), EPSILON);
checkClusterHealth(0, namesystem, 0.0, 0, 0.0);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private static void checkClusterHealth(
int numOfLiveNodes,
FSNamesystem namesystem, double expectedTotalLoad,
int expectedInServiceNodes, double expectedInServiceLoad) {
assertEquals(numOfLiveNodes, namesystem.getNumLiveDataNodes());
assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
assertEquals(expectedTotalLoad, namesystem.getTotalLoad(), EPSILON);
if (expectedInServiceNodes != 0) {
assertEquals(expectedInServiceLoad / expectedInServiceNodes,
getInServiceXceiverAverage(namesystem), EPSILON);
} else {
assertEquals(0.0, getInServiceXceiverAverage(namesystem), EPSILON);
}
}
private static int getNumDNInService(FSNamesystem fsn) {
return fsn.getBlockManager().getDatanodeManager().getFSClusterStats()
.getNumDatanodesInService();
}
private static double getInServiceXceiverAverage(FSNamesystem fsn) {
return fsn.getBlockManager().getDatanodeManager().getFSClusterStats()
.getInServiceXceiverAverage();
}
private void triggerHeartbeats(List<DataNode> datanodes)
throws IOException, InterruptedException {
for (DataNode dn : datanodes) {