HDFS-15016. RBF: getDatanodeReport() should return the latest update. Contributed by Inigo Goiri.
This commit is contained in:
parent
d12ad9e8ad
commit
7fe924b1c0
@ -949,10 +949,13 @@ public DatanodeStorageReport[] getDatanodeStorageReport(
|
|||||||
for (DatanodeStorageReport dn : dns) {
|
for (DatanodeStorageReport dn : dns) {
|
||||||
DatanodeInfo dnInfo = dn.getDatanodeInfo();
|
DatanodeInfo dnInfo = dn.getDatanodeInfo();
|
||||||
String nodeId = dnInfo.getXferAddr();
|
String nodeId = dnInfo.getXferAddr();
|
||||||
if (!datanodesMap.containsKey(nodeId)) {
|
DatanodeStorageReport oldDn = datanodesMap.get(nodeId);
|
||||||
|
if (oldDn == null ||
|
||||||
|
dnInfo.getLastUpdate() > oldDn.getDatanodeInfo().getLastUpdate()) {
|
||||||
datanodesMap.put(nodeId, dn);
|
datanodesMap.put(nodeId, dn);
|
||||||
|
} else {
|
||||||
|
LOG.debug("{} is in multiple subclusters", nodeId);
|
||||||
}
|
}
|
||||||
// TODO merge somehow, right now it just takes the first one
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -886,7 +886,8 @@ public DatanodeInfo[] getDatanodeReport(
|
|||||||
DatanodeInfo[] result = entry.getValue();
|
DatanodeInfo[] result = entry.getValue();
|
||||||
for (DatanodeInfo node : result) {
|
for (DatanodeInfo node : result) {
|
||||||
String nodeId = node.getXferAddr();
|
String nodeId = node.getXferAddr();
|
||||||
if (!datanodesMap.containsKey(nodeId)) {
|
DatanodeInfo dn = datanodesMap.get(nodeId);
|
||||||
|
if (dn == null || node.getLastUpdate() > dn.getLastUpdate()) {
|
||||||
// Add the subcluster as a suffix to the network location
|
// Add the subcluster as a suffix to the network location
|
||||||
node.setNetworkLocation(
|
node.setNetworkLocation(
|
||||||
NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() +
|
NodeBase.PATH_SEPARATOR_STR + ns.getNameserviceId() +
|
||||||
|
@ -63,6 +63,7 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
|
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
|
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeProtocolService;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
|
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
|
||||||
@ -80,8 +81,11 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.http.HttpServer2;
|
import org.apache.hadoop.http.HttpServer2;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||||
@ -119,6 +123,8 @@ public class MockNamenode {
|
|||||||
private String nsId;
|
private String nsId;
|
||||||
/** HA state of the Namenode. */
|
/** HA state of the Namenode. */
|
||||||
private HAServiceState haState = HAServiceState.STANDBY;
|
private HAServiceState haState = HAServiceState.STANDBY;
|
||||||
|
/** Datanodes registered in this Namenode. */
|
||||||
|
private List<DatanodeInfo> dns = new ArrayList<>();
|
||||||
|
|
||||||
/** RPC server of the Namenode that redirects calls to the mock. */
|
/** RPC server of the Namenode that redirects calls to the mock. */
|
||||||
private Server rpcServer;
|
private Server rpcServer;
|
||||||
@ -294,6 +300,14 @@ public void transitionToStandby() {
|
|||||||
this.haState = HAServiceState.STANDBY;
|
this.haState = HAServiceState.STANDBY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the datanodes that this NN will return.
|
||||||
|
* @return The datanodes that this NN will return.
|
||||||
|
*/
|
||||||
|
public List<DatanodeInfo> getDatanodes() {
|
||||||
|
return this.dns;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop the Mock Namenode. It stops all the servers.
|
* Stop the Mock Namenode. It stops all the servers.
|
||||||
* @throws Exception If it cannot stop the Namenode.
|
* @throws Exception If it cannot stop the Namenode.
|
||||||
@ -452,6 +466,33 @@ public void addFileSystemMock() throws IOException {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add datanode related operations.
|
||||||
|
* @throws IOException If it cannot be setup.
|
||||||
|
*/
|
||||||
|
public void addDatanodeMock() throws IOException {
|
||||||
|
when(mockNn.getDatanodeReport(any(DatanodeReportType.class))).thenAnswer(
|
||||||
|
invocation -> {
|
||||||
|
LOG.info("{} getDatanodeReport()", nsId, invocation.getArgument(0));
|
||||||
|
return dns.toArray();
|
||||||
|
});
|
||||||
|
when(mockNn.getDatanodeStorageReport(any(DatanodeReportType.class)))
|
||||||
|
.thenAnswer(invocation -> {
|
||||||
|
LOG.info("{} getDatanodeStorageReport()",
|
||||||
|
nsId, invocation.getArgument(0));
|
||||||
|
DatanodeStorageReport[] ret = new DatanodeStorageReport[dns.size()];
|
||||||
|
for (int i = 0; i < dns.size(); i++) {
|
||||||
|
DatanodeInfo dn = dns.get(i);
|
||||||
|
DatanodeStorage storage = new DatanodeStorage(dn.getName());
|
||||||
|
StorageReport[] storageReports = new StorageReport[] {
|
||||||
|
new StorageReport(storage, false, 0L, 0L, 0L, 0L, 0L)
|
||||||
|
};
|
||||||
|
ret[i] = new DatanodeStorageReport(dn, storageReports);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private static String getSrc(InvocationOnMock invocation) {
|
private static String getSrc(InvocationOnMock invocation) {
|
||||||
return (String) invocation.getArguments()[0];
|
return (String) invocation.getArguments()[0];
|
||||||
}
|
}
|
||||||
|
@ -18,10 +18,14 @@
|
|||||||
package org.apache.hadoop.hdfs.server.federation.router;
|
package org.apache.hadoop.hdfs.server.federation.router;
|
||||||
|
|
||||||
import static java.util.Arrays.asList;
|
import static java.util.Arrays.asList;
|
||||||
|
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileSystem;
|
||||||
|
import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters;
|
||||||
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
|
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
@ -31,9 +35,15 @@
|
|||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.LogVerificationAppender;
|
import org.apache.hadoop.hdfs.LogVerificationAppender;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.DatanodeInfoBuilder;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.server.federation.MockNamenode;
|
import org.apache.hadoop.hdfs.server.federation.MockNamenode;
|
||||||
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
|
||||||
@ -42,6 +52,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
|
||||||
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
|
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||||
import org.apache.hadoop.http.HttpConfig;
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
@ -67,6 +78,8 @@ public class TestRouterNamenodeMonitoring {
|
|||||||
private Map<String, Map<String, MockNamenode>> nns = new HashMap<>();
|
private Map<String, Map<String, MockNamenode>> nns = new HashMap<>();
|
||||||
/** Nameservices in the federated cluster. */
|
/** Nameservices in the federated cluster. */
|
||||||
private List<String> nsIds = asList("ns0", "ns1");
|
private List<String> nsIds = asList("ns0", "ns1");
|
||||||
|
/** Namenodes in the cluster. */
|
||||||
|
private List<String> nnIds = asList("nn0", "nn1");
|
||||||
|
|
||||||
/** Time the test starts. */
|
/** Time the test starts. */
|
||||||
private long initializedTime;
|
private long initializedTime;
|
||||||
@ -77,7 +90,7 @@ public void setup() throws Exception {
|
|||||||
LOG.info("Initialize the Mock Namenodes to monitor");
|
LOG.info("Initialize the Mock Namenodes to monitor");
|
||||||
for (String nsId : nsIds) {
|
for (String nsId : nsIds) {
|
||||||
nns.put(nsId, new HashMap<>());
|
nns.put(nsId, new HashMap<>());
|
||||||
for (String nnId : asList("nn0", "nn1")) {
|
for (String nnId : nnIds) {
|
||||||
nns.get(nsId).put(nnId, new MockNamenode(nsId));
|
nns.get(nsId).put(nnId, new MockNamenode(nsId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -115,14 +128,14 @@ private Configuration getNamenodesConfig() {
|
|||||||
conf.set(DFSConfigKeys.DFS_NAMESERVICES,
|
conf.set(DFSConfigKeys.DFS_NAMESERVICES,
|
||||||
StringUtils.join(",", nns.keySet()));
|
StringUtils.join(",", nns.keySet()));
|
||||||
for (String nsId : nns.keySet()) {
|
for (String nsId : nns.keySet()) {
|
||||||
Set<String> nnIds = nns.get(nsId).keySet();
|
Set<String> nsNnIds = nns.get(nsId).keySet();
|
||||||
|
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
sb.append(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX);
|
sb.append(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX);
|
||||||
sb.append(".").append(nsId);
|
sb.append(".").append(nsId);
|
||||||
conf.set(sb.toString(), StringUtils.join(",", nnIds));
|
conf.set(sb.toString(), StringUtils.join(",", nsNnIds));
|
||||||
|
|
||||||
for (String nnId : nnIds) {
|
for (String nnId : nsNnIds) {
|
||||||
final MockNamenode nn = nns.get(nsId).get(nnId);
|
final MockNamenode nn = nns.get(nsId).get(nnId);
|
||||||
|
|
||||||
sb = new StringBuilder();
|
sb = new StringBuilder();
|
||||||
@ -314,4 +327,92 @@ private void verifyUrlSchemes(String scheme) {
|
|||||||
assertEquals(0, appender.countLinesWithMessage("JMX URL: https://"));
|
assertEquals(0, appender.countLinesWithMessage("JMX URL: https://"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test the view of the Datanodes that the Router sees. If a Datanode is
|
||||||
|
* registered in two subclusters, it should return the most up to date
|
||||||
|
* information.
|
||||||
|
* @throws IOException If the test cannot run.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDatanodesView() throws IOException {
|
||||||
|
|
||||||
|
// Setup the router
|
||||||
|
Configuration routerConf = new RouterConfigBuilder()
|
||||||
|
.stateStore()
|
||||||
|
.rpc()
|
||||||
|
.build();
|
||||||
|
router = new Router();
|
||||||
|
router.init(routerConf);
|
||||||
|
router.start();
|
||||||
|
|
||||||
|
// Setup the namenodes
|
||||||
|
for (String nsId : nsIds) {
|
||||||
|
registerSubclusters(router, nns.get(nsId).values());
|
||||||
|
for (String nnId : nnIds) {
|
||||||
|
MockNamenode nn = nns.get(nsId).get(nnId);
|
||||||
|
if ("nn0".equals(nnId)) {
|
||||||
|
nn.transitionToActive();
|
||||||
|
}
|
||||||
|
nn.addDatanodeMock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set different states for the DNs in each namespace
|
||||||
|
long time = Time.now();
|
||||||
|
for (String nsId : nsIds) {
|
||||||
|
for (String nnId : nnIds) {
|
||||||
|
// dn0 is DECOMMISSIONED in the most recent (ns1)
|
||||||
|
DatanodeInfoBuilder dn0Builder = new DatanodeInfoBuilder()
|
||||||
|
.setDatanodeUuid("dn0")
|
||||||
|
.setHostName("dn0")
|
||||||
|
.setIpAddr("dn0")
|
||||||
|
.setXferPort(10000);
|
||||||
|
if ("ns0".equals(nsId)) {
|
||||||
|
dn0Builder.setLastUpdate(time - 1000);
|
||||||
|
dn0Builder.setAdminState(AdminStates.NORMAL);
|
||||||
|
} else if ("ns1".equals(nsId)) {
|
||||||
|
dn0Builder.setLastUpdate(time - 500);
|
||||||
|
dn0Builder.setAdminState(AdminStates.DECOMMISSIONED);
|
||||||
|
}
|
||||||
|
|
||||||
|
// dn1 is NORMAL in the most recent (ns0)
|
||||||
|
DatanodeInfoBuilder dn1Builder = new DatanodeInfoBuilder()
|
||||||
|
.setDatanodeUuid("dn1")
|
||||||
|
.setHostName("dn1")
|
||||||
|
.setIpAddr("dn1")
|
||||||
|
.setXferPort(10000);
|
||||||
|
if ("ns0".equals(nsId)) {
|
||||||
|
dn1Builder.setLastUpdate(time - 1000);
|
||||||
|
dn1Builder.setAdminState(AdminStates.NORMAL);
|
||||||
|
} else if ("ns1".equals(nsId)) {
|
||||||
|
dn1Builder.setLastUpdate(time - 5 * 1000);
|
||||||
|
dn1Builder.setAdminState(AdminStates.DECOMMISSION_INPROGRESS);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the mock NameNode with the DN views
|
||||||
|
MockNamenode nn = nns.get(nsId).get(nnId);
|
||||||
|
List<DatanodeInfo> dns = nn.getDatanodes();
|
||||||
|
dns.add(dn0Builder.build());
|
||||||
|
dns.add(dn1Builder.build());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the datanodes from the Router and check we get the right view
|
||||||
|
DistributedFileSystem dfs = (DistributedFileSystem)getFileSystem(router);
|
||||||
|
DFSClient dfsClient = dfs.getClient();
|
||||||
|
DatanodeStorageReport[] dns = dfsClient.getDatanodeStorageReport(
|
||||||
|
DatanodeReportType.ALL);
|
||||||
|
assertEquals(2, dns.length);
|
||||||
|
for (DatanodeStorageReport dn : dns) {
|
||||||
|
DatanodeInfo dnInfo = dn.getDatanodeInfo();
|
||||||
|
if ("dn0".equals(dnInfo.getHostName())) {
|
||||||
|
assertEquals(AdminStates.DECOMMISSIONED, dnInfo.getAdminState());
|
||||||
|
} else if ("dn1".equals(dnInfo.getHostName())) {
|
||||||
|
assertEquals(AdminStates.NORMAL, dnInfo.getAdminState());
|
||||||
|
} else {
|
||||||
|
fail("Unexpected DN: " + dnInfo.getHostName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user