From 9a9fbbe145432136d85d2d2e133364c7e79e65e1 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Wed, 20 Mar 2019 11:12:49 +0530 Subject: [PATCH] HDFS-14351. RBF: Optimize configuration item resolving for monitor namenode. Contributed by He Xiaoqiao and Inigo Goiri. --- .../hdfs/server/federation/router/Router.java | 38 ++- .../hdfs/server/federation/MockNamenode.java | 225 ++++++++++++++ .../router/TestRouterNamenodeMonitoring.java | 278 +++++++++++++----- 3 files changed, 444 insertions(+), 97 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java index 7d112f90de..9e18ebfb4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Router.java @@ -497,27 +497,25 @@ public InetSocketAddress getHttpServerAddress() { } // Create heartbeat services for a list specified by the admin - String namenodes = this.conf.get( + Collection namenodes = this.conf.getTrimmedStringCollection( RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE); - if (namenodes != null) { - for (String namenode : namenodes.split(",")) { - String[] namenodeSplit = namenode.split("\\."); - String nsId = null; - String nnId = null; - if (namenodeSplit.length == 2) { - nsId = namenodeSplit[0]; - nnId = namenodeSplit[1]; - } else if (namenodeSplit.length == 1) { - nsId = namenode; - } else { - LOG.error("Wrong Namenode to monitor: {}", namenode); - } - if (nsId != null) { - NamenodeHeartbeatService heartbeatService = - createNamenodeHeartbeatService(nsId, nnId); - if (heartbeatService != null) { - ret.put(heartbeatService.getNamenodeDesc(), heartbeatService); - } + for (String namenode : namenodes) { + String[] namenodeSplit = namenode.split("\\."); + String nsId = null; + String nnId = null; + if (namenodeSplit.length == 2) { + nsId = namenodeSplit[0]; + nnId = namenodeSplit[1]; + } else if (namenodeSplit.length == 1) { + nsId = namenode; + } else { + LOG.error("Wrong Namenode to monitor: {}", namenode); + } + if (nsId != null) { + NamenodeHeartbeatService heartbeatService = + createNamenodeHeartbeatService(nsId, nnId); + if (heartbeatService != null) { + ret.put(heartbeatService.getNamenodeDesc(), heartbeatService); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java new file mode 100644 index 0000000000..9b58fff085 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.ha.HAServiceStatus; +import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService; +import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB; +import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService; +import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService; +import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.http.HttpServer2; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RPC.Server; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.google.protobuf.BlockingService; + + +/** + * Mock for the network interfaces (e.g., RPC and HTTP) of a Namenode. This is + * used by the Routers in a mock cluster. + */ +public class MockNamenode { + + /** Mock implementation of the Namenode. */ + private final NamenodeProtocols mockNn; + + /** HA state of the Namenode. */ + private HAServiceState haState = HAServiceState.STANDBY; + + /** RPC server of the Namenode that redirects calls to the mock. */ + private Server rpcServer; + /** HTTP server of the Namenode that redirects calls to the mock. */ + private HttpServer2 httpServer; + + + public MockNamenode() throws Exception { + Configuration conf = new Configuration(); + + this.mockNn = mock(NamenodeProtocols.class); + setupMock(); + setupRPCServer(conf); + setupHTTPServer(conf); + } + + /** + * Setup the mock of the Namenode. It offers the basic functionality for + * Routers to get the status. + * @throws IOException If the mock cannot be setup. + */ + protected void setupMock() throws IOException { + NamespaceInfo nsInfo = new NamespaceInfo(1, "clusterId", "bpId", 1); + when(mockNn.versionRequest()).thenReturn(nsInfo); + + when(mockNn.getServiceStatus()).thenAnswer(new Answer() { + @Override + public HAServiceStatus answer(InvocationOnMock invocation) + throws Throwable { + HAServiceStatus haStatus = new HAServiceStatus(getHAServiceState()); + haStatus.setNotReadyToBecomeActive(""); + return haStatus; + } + }); + } + + /** + * Setup the RPC server of the Namenode that redirects calls to the mock. + * @param conf Configuration of the server. + * @throws IOException If the RPC server cannot be setup. + */ + private void setupRPCServer(final Configuration conf) throws IOException { + RPC.setProtocolEngine( + conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); + ClientNamenodeProtocolServerSideTranslatorPB + clientNNProtoXlator = + new ClientNamenodeProtocolServerSideTranslatorPB(mockNn); + BlockingService clientNNPbService = + ClientNamenodeProtocol.newReflectiveBlockingService( + clientNNProtoXlator); + + rpcServer = new RPC.Builder(conf) + .setProtocol(ClientNamenodeProtocolPB.class) + .setInstance(clientNNPbService) + .setBindAddress("0.0.0.0") + .setPort(0) + .build(); + + NamenodeProtocolServerSideTranslatorPB nnProtoXlator = + new NamenodeProtocolServerSideTranslatorPB(mockNn); + BlockingService nnProtoPbService = + NamenodeProtocolService.newReflectiveBlockingService( + nnProtoXlator); + DFSUtil.addPBProtocol( + conf, NamenodeProtocolPB.class, nnProtoPbService, rpcServer); + + DatanodeProtocolServerSideTranslatorPB dnProtoPbXlator = + new DatanodeProtocolServerSideTranslatorPB(mockNn, 1000); + BlockingService dnProtoPbService = + DatanodeProtocolService.newReflectiveBlockingService( + dnProtoPbXlator); + DFSUtil.addPBProtocol( + conf, DatanodeProtocolPB.class, dnProtoPbService, rpcServer); + + HAServiceProtocolServerSideTranslatorPB haServiceProtoXlator = + new HAServiceProtocolServerSideTranslatorPB(mockNn); + BlockingService haProtoPbService = + HAServiceProtocolService.newReflectiveBlockingService( + haServiceProtoXlator); + DFSUtil.addPBProtocol( + conf, HAServiceProtocolPB.class, haProtoPbService, rpcServer); + + rpcServer.start(); + } + + /** + * Setup the HTTP server of the Namenode that redirects calls to the mock. + * @param conf Configuration of the server. + * @throws IOException If the HTTP server cannot be setup. + */ + private void setupHTTPServer(Configuration conf) throws IOException { + HttpServer2.Builder builder = new HttpServer2.Builder() + .setName("hdfs") + .setConf(conf) + .setACL(new AccessControlList(conf.get(DFS_ADMIN, " "))) + .addEndpoint(URI.create("http://0.0.0.0:0")); + httpServer = builder.build(); + httpServer.start(); + } + + /** + * Get the RPC port for the Mock Namenode. + * @return RPC port. + */ + public int getRPCPort() { + return rpcServer.getListenerAddress().getPort(); + } + + /** + * Get the HTTP port for the Mock Namenode. + * @return HTTP port. + */ + public int getHTTPPort() { + return httpServer.getConnectorAddress(0).getPort(); + } + + /** + * Get the Mock core. This is used to extend the mock. + * @return Mock Namenode protocol to be extended. + */ + public NamenodeProtocols getMock() { + return mockNn; + } + + /** + * Get the HA state of the Mock Namenode. + * @return HA state (ACTIVE or STANDBY). + */ + public HAServiceState getHAServiceState() { + return haState; + } + + /** + * Show the Mock Namenode as Active. + */ + public void transitionToActive() { + this.haState = HAServiceState.ACTIVE; + } + + /** + * Show the Mock Namenode as Standby. + */ + public void transitionToStandby() { + this.haState = HAServiceState.STANDBY; + } + + /** + * Stop the Mock Namenode. It stops all the servers. + * @throws Exception If it cannot stop the Namenode. + */ + public void stop() throws Exception { + if (rpcServer != null) { + rpcServer.stop(); + } + if (httpServer != null) { + httpServer.stop(); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java index 0bea11c017..1224fa2ddc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java @@ -17,127 +17,251 @@ */ package org.apache.hadoop.hdfs.server.federation.router; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; -import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; -import static org.junit.Assert.assertEquals; +import static java.util.Arrays.asList; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.MockNamenode; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; -import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext; -import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; -import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Test namenodes monitor behavior in the Router. */ public class TestRouterNamenodeMonitoring { - private static StateStoreDFSCluster cluster; - private static RouterContext routerContext; - private static MembershipNamenodeResolver resolver; + private static final Logger LOG = + LoggerFactory.getLogger(TestRouterNamenodeMonitoring.class); - private String ns0; - private String ns1; + + /** Router for the test. */ + private Router router; + /** Namenodes in the cluster. */ + private Map> nns = new HashMap<>(); + /** Nameservices in the federated cluster. */ + private List nsIds = asList("ns0", "ns1"); + + /** Time the test starts. */ private long initializedTime; + @Before - public void setUp() throws Exception { - // Build and start a federated cluster with HA enabled - cluster = new StateStoreDFSCluster(true, 2); - // Enable heartbeat service and local heartbeat - Configuration routerConf = new RouterConfigBuilder() - .stateStore() - .admin() - .rpc() - .enableLocalHeartbeat(true) - .heartbeat() - .build(); - - // Specify local node (ns0.nn1) to monitor - StringBuilder sb = new StringBuilder(); - ns0 = cluster.getNameservices().get(0); - NamenodeContext context = cluster.getNamenodes(ns0).get(1); - routerConf.set(DFS_NAMESERVICE_ID, ns0); - routerConf.set(DFS_HA_NAMENODE_ID_KEY, context.getNamenodeId()); - - // Specify namenodes (ns1.nn0,ns1.nn1) to monitor - sb = new StringBuilder(); - ns1 = cluster.getNameservices().get(1); - for (NamenodeContext ctx : cluster.getNamenodes(ns1)) { - String suffix = ctx.getConfSuffix(); - if (sb.length() != 0) { - sb.append(","); + public void setup() throws Exception { + LOG.info("Initialize the Mock Namenodes to monitor"); + for (String nsId : nsIds) { + nns.put(nsId, new HashMap<>()); + for (String nnId : asList("nn0", "nn1")) { + nns.get(nsId).put(nnId, new MockNamenode()); } - sb.append(suffix); } - // override with the namenodes: ns1.nn0,ns1.nn1 - routerConf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString()); - cluster.addRouterOverrides(routerConf); - cluster.startCluster(); - cluster.startRouters(); - cluster.waitClusterUp(); + LOG.info("Set nn0 to active for all nameservices"); + for (Map nnNS : nns.values()) { + nnNS.get("nn0").transitionToActive(); + nnNS.get("nn1").transitionToStandby(); + } - routerContext = cluster.getRandomRouter(); - resolver = (MembershipNamenodeResolver) routerContext.getRouter() - .getNamenodeResolver(); initializedTime = Time.now(); } @After - public void tearDown() { - if (cluster != null) { - cluster.stopRouter(routerContext); - cluster.shutdown(); - cluster = null; + public void cleanup() throws Exception { + for (Map nnNS : nns.values()) { + for (MockNamenode nn : nnNS.values()) { + nn.stop(); + } } + nns.clear(); + + if (router != null) { + router.stop(); + } + } + + /** + * Get the configuration of the cluster which contains all the Namenodes and + * their addresses. + * @return Configuration containing all the Namenodes. + */ + private Configuration getNamenodesConfig() { + final Configuration conf = new HdfsConfiguration(); + conf.set(DFSConfigKeys.DFS_NAMESERVICES, + StringUtils.join(",", nns.keySet())); + for (String nsId : nns.keySet()) { + Set nnIds = nns.get(nsId).keySet(); + + StringBuilder sb = new StringBuilder(); + sb.append(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX); + sb.append(".").append(nsId); + conf.set(sb.toString(), StringUtils.join(",", nnIds)); + + for (String nnId : nnIds) { + final MockNamenode nn = nns.get(nsId).get(nnId); + + sb = new StringBuilder(); + sb.append(DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY); + sb.append(".").append(nsId); + sb.append(".").append(nnId); + conf.set(sb.toString(), "localhost:" + nn.getRPCPort()); + + sb = new StringBuilder(); + sb.append(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY); + sb.append(".").append(nsId); + sb.append(".").append(nnId); + conf.set(sb.toString(), "localhost:" + nn.getHTTPPort()); + } + } + return conf; } @Test public void testNamenodeMonitoring() throws Exception { - // Set nn0 to active for all nameservices - for (String ns : cluster.getNameservices()) { - cluster.switchToActive(ns, "nn0"); - cluster.switchToStandby(ns, "nn1"); - } + Configuration nsConf = getNamenodesConfig(); - Collection heartbeatServices = routerContext - .getRouter().getNamenodeHeartbeatServices(); - // manually trigger the heartbeat + // Setup the State Store for the Router to use + Configuration stateStoreConfig = getStateStoreConfiguration(); + stateStoreConfig.setClass( + RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + MembershipNamenodeResolver.class, ActiveNamenodeResolver.class); + stateStoreConfig.setClass( + RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + MountTableResolver.class, FileSubclusterResolver.class); + + Configuration routerConf = new RouterConfigBuilder(nsConf) + .enableLocalHeartbeat(true) + .heartbeat() + .stateStore() + .rpc() + .build(); + + // Specify namenodes (ns1.nn0,ns1.nn1) to monitor + routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0"); + routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, + "ns1.nn0,ns1.nn1"); + routerConf.addResource(stateStoreConfig); + + // Specify local node (ns0.nn1) to monitor + routerConf.set(DFSConfigKeys.DFS_NAMESERVICE_ID, "ns0"); + routerConf.set(DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY, "nn1"); + + // Start the Router with the namenodes to monitor + router = new Router(); + router.init(routerConf); + router.start(); + + // Manually trigger the heartbeat and update the values + Collection heartbeatServices = + router.getNamenodeHeartbeatServices(); for (NamenodeHeartbeatService service : heartbeatServices) { service.periodicInvoke(); } - + MembershipNamenodeResolver resolver = + (MembershipNamenodeResolver) router.getNamenodeResolver(); resolver.loadCache(true); - List namespaceInfo0 = - resolver.getNamenodesForNameserviceId(ns0); - List namespaceInfo1 = - resolver.getNamenodesForNameserviceId(ns1); - // The modified date won't be updated in ns0.nn0 since it isn't - // monitored by the Router. - assertEquals("nn0", namespaceInfo0.get(1).getNamenodeId()); - assertTrue(namespaceInfo0.get(1).getDateModified() < initializedTime); + // Check that the monitored values are expected + final List namespaceInfo = new ArrayList<>(); + for (String nsId : nns.keySet()) { + List nnReports = + resolver.getNamenodesForNameserviceId(nsId); + namespaceInfo.addAll(nnReports); + } + for (FederationNamenodeContext nnInfo : namespaceInfo) { + long modTime = nnInfo.getDateModified(); + long diff = modTime - initializedTime; + if ("ns0".equals(nnInfo.getNameserviceId()) && + "nn0".equals(nnInfo.getNamenodeId())) { + // The modified date won't be updated in ns0.nn0 + // since it isn't monitored by the Router. + assertTrue(nnInfo + " shouldn't be updated: " + diff, + modTime < initializedTime); + } else { + // other namnodes should be updated as expected + assertTrue(nnInfo + " should be updated: " + diff, + modTime > initializedTime); + } + } + } - // other namnodes should be updated as expected - assertEquals("nn1", namespaceInfo0.get(0).getNamenodeId()); - assertTrue(namespaceInfo0.get(0).getDateModified() > initializedTime); + @Test + public void testNamenodeMonitoringConfig() throws Exception { + testConfig(asList(), ""); + testConfig(asList("ns1.nn0"), "ns1.nn0"); + testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0,ns1.nn1"); + testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0, ns1.nn1"); + testConfig(asList("ns1.nn0", "ns1.nn1"), " ns1.nn0,ns1.nn1"); + testConfig(asList("ns1.nn0", "ns1.nn1"), "ns1.nn0,ns1.nn1,"); + } - assertEquals("nn0", namespaceInfo1.get(0).getNamenodeId()); - assertTrue(namespaceInfo1.get(0).getDateModified() > initializedTime); + /** + * Test if configuring a Router to monitor particular Namenodes actually + * takes effect. + * @param expectedNNs Namenodes that should be monitored. + * @param confNsIds Router configuration setting for Namenodes to monitor. + */ + private void testConfig( + Collection expectedNNs, String confNsIds) { - assertEquals("nn1", namespaceInfo1.get(1).getNamenodeId()); - assertTrue(namespaceInfo1.get(1).getDateModified() > initializedTime); + // Setup and start the Router + Configuration conf = getNamenodesConfig(); + Configuration routerConf = new RouterConfigBuilder(conf) + .heartbeat(true) + .build(); + routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0"); + routerConf.set(RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE, confNsIds); + router = new Router(); + router.init(routerConf); + + // Test the heartbeat services of the Router + Collection heartbeatServices = + router.getNamenodeHeartbeatServices(); + assertNamenodeHeartbeatService(expectedNNs, heartbeatServices); + } + + /** + * Assert that the namenodes monitored by the Router are the expected. + * @param expected Expected namenodes. + * @param actual Actual heartbeat services for the Router + */ + private static void assertNamenodeHeartbeatService( + Collection expected, + Collection actual) { + + final Set actualSet = new TreeSet<>(); + for (NamenodeHeartbeatService heartbeatService : actual) { + NamenodeStatusReport report = heartbeatService.getNamenodeStatusReport(); + StringBuilder sb = new StringBuilder(); + sb.append(report.getNameserviceId()); + sb.append("."); + sb.append(report.getNamenodeId()); + actualSet.add(sb.toString()); + } + assertTrue(expected + " does not contain all " + actualSet, + expected.containsAll(actualSet)); + assertTrue(actualSet + " does not contain all " + expected, + actualSet.containsAll(expected)); } }