diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 78e5855746..b7c2b03e5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -949,10 +949,13 @@ public DatanodeStorageReport[] getDatanodeStorageReport( for (DatanodeStorageReport dn : dns) { DatanodeInfo dnInfo = dn.getDatanodeInfo(); String nodeId = dnInfo.getXferAddr(); - if (!datanodesMap.containsKey(nodeId)) { + DatanodeStorageReport oldDn = datanodesMap.get(nodeId); + if (oldDn == null || + dnInfo.getLastUpdate() > oldDn.getDatanodeInfo().getLastUpdate()) { datanodesMap.put(nodeId, dn); + } else { + LOG.debug("{} is in multiple subclusters", nodeId); } - // TODO merge somehow, right now it just takes the first one } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index f33d1d36e9..2ae78c9d50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -886,7 +886,8 @@ public DatanodeInfo[] getDatanodeReport( DatanodeInfo[] result = entry.getValue(); for (DatanodeInfo node : result) { String nodeId = node.getXferAddr(); - if (!datanodesMap.containsKey(nodeId)) { + DatanodeInfo dn = datanodesMap.get(nodeId); + if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) { // Add the subcluster as a suffix to the network location node.setNetworkLocation( NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() + diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java index 8b5fb5498a..97428c46af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService; @@ -80,8 +81,11 @@ import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtobufRpcEngine; @@ -119,6 +123,8 @@ public class MockNamenode { private String nsId; /** HA state of the Namenode. */ private HAServiceState haState = HAServiceState.STANDBY; + /** Datanodes registered in this Namenode. */ + private List dns = new ArrayList<>(); /** RPC server of the Namenode that redirects calls to the mock. */ private Server rpcServer; @@ -294,6 +300,14 @@ public void transitionToStandby() { this.haState = HAServiceState.STANDBY; } + /** + * Get the datanodes that this NN will return. + * @return The datanodes that this NN will return. + */ + public List getDatanodes() { + return this.dns; + } + /** * Stop the Mock Namenode. It stops all the servers. * @throws Exception If it cannot stop the Namenode. @@ -452,6 +466,33 @@ public void addFileSystemMock() throws IOException { }); } + /** + * Add datanode related operations. + * @throws IOException If it cannot be setup. + */ + public void addDatanodeMock() throws IOException { + when(mockNn.getDatanodeReport(any(DatanodeReportType.class))).thenAnswer( + invocation -> { + LOG.info("{} getDatanodeReport()", nsId, invocation.getArgument(0)); + return dns.toArray(); + }); + when(mockNn.getDatanodeStorageReport(any(DatanodeReportType.class))) + .thenAnswer(invocation -> { + LOG.info("{} getDatanodeStorageReport()", + nsId, invocation.getArgument(0)); + DatanodeStorageReport[] ret = new DatanodeStorageReport[dns.size()]; + for (int i = 0; i < dns.size(); i++) { + DatanodeInfo dn = dns.get(i); + DatanodeStorage storage = new DatanodeStorage(dn.getName()); + StorageReport[] storageReports = new StorageReport[] { + new StorageReport(storage, false, 0L, 0L, 0L, 0L, 0L) + }; + ret[i] = new DatanodeStorageReport(dn, storageReports); + } + return ret; + }); + } + private static String getSrc(InvocationOnMock invocation) { return (String) invocation.getArguments()[0]; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java index 9fcfcb4ae3..b5f5b6701f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java @@ -18,10 +18,14 @@ package org.apache.hadoop.hdfs.server.federation.router; import static java.util.Arrays.asList; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileSystem; +import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters; import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -31,9 +35,15 @@ import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.LogVerificationAppender; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.federation.MockNamenode; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; @@ -42,6 +52,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver; import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; @@ -67,6 +78,8 @@ public class TestRouterNamenodeMonitoring { private Map> nns = new HashMap<>(); /** Nameservices in the federated cluster. */ private List nsIds = asList("ns0", "ns1"); + /** Namenodes in the cluster. */ + private List nnIds = asList("nn0", "nn1"); /** Time the test starts. */ private long initializedTime; @@ -77,7 +90,7 @@ public void setup() throws Exception { LOG.info("Initialize the Mock Namenodes to monitor"); for (String nsId : nsIds) { nns.put(nsId, new HashMap<>()); - for (String nnId : asList("nn0", "nn1")) { + for (String nnId : nnIds) { nns.get(nsId).put(nnId, new MockNamenode(nsId)); } } @@ -115,14 +128,14 @@ private Configuration getNamenodesConfig() { conf.set(DFSConfigKeys.DFS_NAMESERVICES, StringUtils.join(",", nns.keySet())); for (String nsId : nns.keySet()) { - Set nnIds = nns.get(nsId).keySet(); + Set nsNnIds = nns.get(nsId).keySet(); StringBuilder sb = new StringBuilder(); sb.append(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX); sb.append(".").append(nsId); - conf.set(sb.toString(), StringUtils.join(",", nnIds)); + conf.set(sb.toString(), StringUtils.join(",", nsNnIds)); - for (String nnId : nnIds) { + for (String nnId : nsNnIds) { final MockNamenode nn = nns.get(nsId).get(nnId); sb = new StringBuilder(); @@ -314,4 +327,92 @@ private void verifyUrlSchemes(String scheme) { assertEquals(0, appender.countLinesWithMessage("JMX URL: https://")); } } + + /** + * Test the view of the Datanodes that the Router sees. If a Datanode is + * registered in two subclusters, it should return the most up to date + * information. + * @throws IOException If the test cannot run. + */ + @Test + public void testDatanodesView() throws IOException { + + // Setup the router + Configuration routerConf = new RouterConfigBuilder() + .stateStore() + .rpc() + .build(); + router = new Router(); + router.init(routerConf); + router.start(); + + // Setup the namenodes + for (String nsId : nsIds) { + registerSubclusters(router, nns.get(nsId).values()); + for (String nnId : nnIds) { + MockNamenode nn = nns.get(nsId).get(nnId); + if ("nn0".equals(nnId)) { + nn.transitionToActive(); + } + nn.addDatanodeMock(); + } + } + + // Set different states for the DNs in each namespace + long time = Time.now(); + for (String nsId : nsIds) { + for (String nnId : nnIds) { + // dn0 is DECOMMISSIONED in the most recent (ns1) + DatanodeInfoBuilder dn0Builder = new DatanodeInfoBuilder() + .setDatanodeUuid("dn0") + .setHostName("dn0") + .setIpAddr("dn0") + .setXferPort(10000); + if ("ns0".equals(nsId)) { + dn0Builder.setLastUpdate(time - 1000); + dn0Builder.setAdminState(AdminStates.NORMAL); + } else if ("ns1".equals(nsId)) { + dn0Builder.setLastUpdate(time - 500); + dn0Builder.setAdminState(AdminStates.DECOMMISSIONED); + } + + // dn1 is NORMAL in the most recent (ns0) + DatanodeInfoBuilder dn1Builder = new DatanodeInfoBuilder() + .setDatanodeUuid("dn1") + .setHostName("dn1") + .setIpAddr("dn1") + .setXferPort(10000); + if ("ns0".equals(nsId)) { + dn1Builder.setLastUpdate(time - 1000); + dn1Builder.setAdminState(AdminStates.NORMAL); + } else if ("ns1".equals(nsId)) { + dn1Builder.setLastUpdate(time - 5 * 1000); + dn1Builder.setAdminState(AdminStates.DECOMMISSION_INPROGRESS); + } + + // Update the mock NameNode with the DN views + MockNamenode nn = nns.get(nsId).get(nnId); + List dns = nn.getDatanodes(); + dns.add(dn0Builder.build()); + dns.add(dn1Builder.build()); + } + } + + // Get the datanodes from the Router and check we get the right view + DistributedFileSystem dfs = (DistributedFileSystem)getFileSystem(router); + DFSClient dfsClient = dfs.getClient(); + DatanodeStorageReport[] dns = dfsClient.getDatanodeStorageReport( + DatanodeReportType.ALL); + assertEquals(2, dns.length); + for (DatanodeStorageReport dn : dns) { + DatanodeInfo dnInfo = dn.getDatanodeInfo(); + if ("dn0".equals(dnInfo.getHostName())) { + assertEquals(AdminStates.DECOMMISSIONED, dnInfo.getAdminState()); + } else if ("dn1".equals(dnInfo.getHostName())) { + assertEquals(AdminStates.NORMAL, dnInfo.getAdminState()); + } else { + fail("Unexpected DN: " + dnInfo.getHostName()); + } + } + } }