HDDS-2199. In SCMNodeManager dnsToUuidMap cannot track multiple DNs on the same host
Closes #1551
This commit is contained in:
parent
d061c8469f
commit
6171a41b4c
@ -192,11 +192,11 @@ void processNodeReport(DatanodeDetails datanodeDetails,
|
|||||||
DatanodeDetails getNodeByUuid(String uuid);
|
DatanodeDetails getNodeByUuid(String uuid);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given datanode address(Ipaddress or hostname), returns the DatanodeDetails
|
* Given datanode address(Ipaddress or hostname), returns a list of
|
||||||
* for the node.
|
* DatanodeDetails for the datanodes running at that address.
|
||||||
*
|
*
|
||||||
* @param address datanode address
|
* @param address datanode address
|
||||||
* @return the given datanode, or null if not found
|
* @return the given datanode, or empty list if none found
|
||||||
*/
|
*/
|
||||||
DatanodeDetails getNodeByAddress(String address);
|
List<DatanodeDetails> getNodesByAddress(String address);
|
||||||
}
|
}
|
||||||
|
@ -25,11 +25,13 @@
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
|
||||||
@ -98,7 +100,7 @@ public class SCMNodeManager implements NodeManager {
|
|||||||
private final NetworkTopology clusterMap;
|
private final NetworkTopology clusterMap;
|
||||||
private final DNSToSwitchMapping dnsToSwitchMapping;
|
private final DNSToSwitchMapping dnsToSwitchMapping;
|
||||||
private final boolean useHostname;
|
private final boolean useHostname;
|
||||||
private final ConcurrentHashMap<String, String> dnsToUuidMap =
|
private final ConcurrentHashMap<String, Set<String>> dnsToUuidMap =
|
||||||
new ConcurrentHashMap<>();
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -260,7 +262,7 @@ public RegisteredCommand register(
|
|||||||
}
|
}
|
||||||
nodeStateManager.addNode(datanodeDetails);
|
nodeStateManager.addNode(datanodeDetails);
|
||||||
clusterMap.add(datanodeDetails);
|
clusterMap.add(datanodeDetails);
|
||||||
dnsToUuidMap.put(dnsName, datanodeDetails.getUuidString());
|
addEntryTodnsToUuidMap(dnsName, datanodeDetails.getUuidString());
|
||||||
// Updating Node Report, as registration is successful
|
// Updating Node Report, as registration is successful
|
||||||
processNodeReport(datanodeDetails, nodeReport);
|
processNodeReport(datanodeDetails, nodeReport);
|
||||||
LOG.info("Registered Data node : {}", datanodeDetails);
|
LOG.info("Registered Data node : {}", datanodeDetails);
|
||||||
@ -275,6 +277,26 @@ public RegisteredCommand register(
|
|||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add an entry to the dnsToUuidMap, which maps hostname / IP to the DNs
|
||||||
|
* running on that host. As each address can have many DNs running on it,
|
||||||
|
* this is a one to many mapping.
|
||||||
|
* @param dnsName String representing the hostname or IP of the node
|
||||||
|
* @param uuid String representing the UUID of the registered node.
|
||||||
|
*/
|
||||||
|
@SuppressFBWarnings(value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
|
||||||
|
justification="The method is synchronized and this is the only place "+
|
||||||
|
"dnsToUuidMap is modified")
|
||||||
|
private synchronized void addEntryTodnsToUuidMap(
|
||||||
|
String dnsName, String uuid) {
|
||||||
|
Set<String> dnList = dnsToUuidMap.get(dnsName);
|
||||||
|
if (dnList == null) {
|
||||||
|
dnList = ConcurrentHashMap.newKeySet();
|
||||||
|
dnsToUuidMap.put(dnsName, dnList);
|
||||||
|
}
|
||||||
|
dnList.add(uuid);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send heartbeat to indicate the datanode is alive and doing well.
|
* Send heartbeat to indicate the datanode is alive and doing well.
|
||||||
*
|
*
|
||||||
@ -584,29 +606,34 @@ public DatanodeDetails getNodeByUuid(String uuid) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given datanode address(Ipaddress or hostname), returns the DatanodeDetails
|
* Given datanode address(Ipaddress or hostname), return a list of
|
||||||
* for the node.
|
* DatanodeDetails for the datanodes registered on that address.
|
||||||
*
|
*
|
||||||
* @param address datanode address
|
* @param address datanode address
|
||||||
* @return the given datanode, or null if not found
|
* @return the given datanode, or empty list if none found
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public DatanodeDetails getNodeByAddress(String address) {
|
public List<DatanodeDetails> getNodesByAddress(String address) {
|
||||||
|
List<DatanodeDetails> results = new LinkedList<>();
|
||||||
if (Strings.isNullOrEmpty(address)) {
|
if (Strings.isNullOrEmpty(address)) {
|
||||||
LOG.warn("address is null");
|
LOG.warn("address is null");
|
||||||
return null;
|
return results;
|
||||||
}
|
}
|
||||||
String uuid = dnsToUuidMap.get(address);
|
Set<String> uuids = dnsToUuidMap.get(address);
|
||||||
if (uuid != null) {
|
if (uuids == null) {
|
||||||
|
LOG.warn("Cannot find node for address {}", address);
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (String uuid : uuids) {
|
||||||
DatanodeDetails temp = DatanodeDetails.newBuilder().setUuid(uuid).build();
|
DatanodeDetails temp = DatanodeDetails.newBuilder().setUuid(uuid).build();
|
||||||
try {
|
try {
|
||||||
return nodeStateManager.getNode(temp);
|
results.add(nodeStateManager.getNode(temp));
|
||||||
} catch (NodeNotFoundException e) {
|
} catch (NodeNotFoundException e) {
|
||||||
LOG.warn("Cannot find node for uuid {}", uuid);
|
LOG.warn("Cannot find node for uuid {}", uuid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.warn("Cannot find node for address {}", address);
|
return results;
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private String nodeResolve(String hostname) {
|
private String nodeResolve(String hostname) {
|
||||||
|
@ -295,7 +295,12 @@ public List<DatanodeDetails> sortDatanodes(List<String> nodes,
|
|||||||
boolean auditSuccess = true;
|
boolean auditSuccess = true;
|
||||||
try{
|
try{
|
||||||
NodeManager nodeManager = scm.getScmNodeManager();
|
NodeManager nodeManager = scm.getScmNodeManager();
|
||||||
Node client = nodeManager.getNodeByAddress(clientMachine);
|
Node client = null;
|
||||||
|
List<DatanodeDetails> possibleClients =
|
||||||
|
nodeManager.getNodesByAddress(clientMachine);
|
||||||
|
if (possibleClients.size()>0){
|
||||||
|
client = possibleClients.get(0);
|
||||||
|
}
|
||||||
List<Node> nodeList = new ArrayList();
|
List<Node> nodeList = new ArrayList();
|
||||||
nodes.stream().forEach(uuid -> {
|
nodes.stream().forEach(uuid -> {
|
||||||
DatanodeDetails node = nodeManager.getNodeByUuid(uuid);
|
DatanodeDetails node = nodeManager.getNodeByUuid(uuid);
|
||||||
|
@ -88,7 +88,7 @@ public class MockNodeManager implements NodeManager {
|
|||||||
private final Node2PipelineMap node2PipelineMap;
|
private final Node2PipelineMap node2PipelineMap;
|
||||||
private final Node2ContainerMap node2ContainerMap;
|
private final Node2ContainerMap node2ContainerMap;
|
||||||
private NetworkTopology clusterMap;
|
private NetworkTopology clusterMap;
|
||||||
private ConcurrentHashMap<String, String> dnsToUuidMap;
|
private ConcurrentHashMap<String, Set<String>> dnsToUuidMap;
|
||||||
|
|
||||||
public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
|
public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
|
||||||
this.healthyNodes = new LinkedList<>();
|
this.healthyNodes = new LinkedList<>();
|
||||||
@ -386,7 +386,7 @@ public RegisteredCommand register(DatanodeDetails datanodeDetails,
|
|||||||
try {
|
try {
|
||||||
node2ContainerMap.insertNewDatanode(datanodeDetails.getUuid(),
|
node2ContainerMap.insertNewDatanode(datanodeDetails.getUuid(),
|
||||||
Collections.emptySet());
|
Collections.emptySet());
|
||||||
dnsToUuidMap.put(datanodeDetails.getIpAddress(),
|
addEntryTodnsToUuidMap(datanodeDetails.getIpAddress(),
|
||||||
datanodeDetails.getUuidString());
|
datanodeDetails.getUuidString());
|
||||||
if (clusterMap != null) {
|
if (clusterMap != null) {
|
||||||
datanodeDetails.setNetworkName(datanodeDetails.getUuidString());
|
datanodeDetails.setNetworkName(datanodeDetails.getUuidString());
|
||||||
@ -398,6 +398,23 @@ public RegisteredCommand register(DatanodeDetails datanodeDetails,
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add an entry to the dnsToUuidMap, which maps hostname / IP to the DNs
|
||||||
|
* running on that host. As each address can have many DNs running on it,
|
||||||
|
* this is a one to many mapping.
|
||||||
|
* @param dnsName String representing the hostname or IP of the node
|
||||||
|
* @param uuid String representing the UUID of the registered node.
|
||||||
|
*/
|
||||||
|
private synchronized void addEntryTodnsToUuidMap(
|
||||||
|
String dnsName, String uuid) {
|
||||||
|
Set<String> dnList = dnsToUuidMap.get(dnsName);
|
||||||
|
if (dnList == null) {
|
||||||
|
dnList = ConcurrentHashMap.newKeySet();
|
||||||
|
dnsToUuidMap.put(dnsName, dnList);
|
||||||
|
}
|
||||||
|
dnList.add(uuid);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send heartbeat to indicate the datanode is alive and doing well.
|
* Send heartbeat to indicate the datanode is alive and doing well.
|
||||||
*
|
*
|
||||||
@ -484,8 +501,19 @@ public DatanodeDetails getNodeByUuid(String uuid) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DatanodeDetails getNodeByAddress(String address) {
|
public List<DatanodeDetails> getNodesByAddress(String address) {
|
||||||
return getNodeByUuid(dnsToUuidMap.get(address));
|
List<DatanodeDetails> results = new LinkedList<>();
|
||||||
|
Set<String> uuids = dnsToUuidMap.get(address);
|
||||||
|
if (uuids == null) {
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
for(String uuid : uuids) {
|
||||||
|
DatanodeDetails dn = getNodeByUuid(uuid);
|
||||||
|
if (dn != null) {
|
||||||
|
results.add(dn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setNetworkTopology(NetworkTopology topology) {
|
public void setNetworkTopology(NetworkTopology topology) {
|
||||||
|
@ -1064,6 +1064,25 @@ public void testScmRegisterNodeWithHostname()
|
|||||||
testScmRegisterNodeWithNetworkTopology(true);
|
testScmRegisterNodeWithNetworkTopology(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test getNodesByAddress when using IPs.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testgetNodesByAddressWithIpAddress()
|
||||||
|
throws IOException, InterruptedException, AuthenticationException {
|
||||||
|
testGetNodesByAddress(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test getNodesByAddress when using hostnames.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testgetNodesByAddressWithHostname()
|
||||||
|
throws IOException, InterruptedException, AuthenticationException {
|
||||||
|
testGetNodesByAddress(true);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test add node into a 4-layer network topology during node register.
|
* Test add node into a 4-layer network topology during node register.
|
||||||
*/
|
*/
|
||||||
@ -1152,11 +1171,55 @@ private void testScmRegisterNodeWithNetworkTopology(boolean useHostname)
|
|||||||
// test get node
|
// test get node
|
||||||
if (useHostname) {
|
if (useHostname) {
|
||||||
Arrays.stream(hostNames).forEach(hostname ->
|
Arrays.stream(hostNames).forEach(hostname ->
|
||||||
Assert.assertNotNull(nodeManager.getNodeByAddress(hostname)));
|
Assert.assertNotEquals(0, nodeManager.getNodesByAddress(hostname)
|
||||||
|
.size()));
|
||||||
} else {
|
} else {
|
||||||
Arrays.stream(ipAddress).forEach(ip ->
|
Arrays.stream(ipAddress).forEach(ip ->
|
||||||
Assert.assertNotNull(nodeManager.getNodeByAddress(ip)));
|
Assert.assertNotEquals(0, nodeManager.getNodesByAddress(ip)
|
||||||
|
.size()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test add node into a 4-layer network topology during node register.
|
||||||
|
*/
|
||||||
|
private void testGetNodesByAddress(boolean useHostname)
|
||||||
|
throws IOException, InterruptedException, AuthenticationException {
|
||||||
|
OzoneConfiguration conf = getConf();
|
||||||
|
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000,
|
||||||
|
MILLISECONDS);
|
||||||
|
|
||||||
|
// create a set of hosts - note two hosts on "host1"
|
||||||
|
String[] hostNames = {"host1", "host1", "host2", "host3", "host4"};
|
||||||
|
String[] ipAddress =
|
||||||
|
{"1.2.3.4", "1.2.3.4", "2.3.4.5", "3.4.5.6", "4.5.6.7"};
|
||||||
|
|
||||||
|
if (useHostname) {
|
||||||
|
conf.set(DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, "true");
|
||||||
|
}
|
||||||
|
final int nodeCount = hostNames.length;
|
||||||
|
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
|
||||||
|
DatanodeDetails[] nodes = new DatanodeDetails[nodeCount];
|
||||||
|
for (int i = 0; i < nodeCount; i++) {
|
||||||
|
DatanodeDetails node = TestUtils.createDatanodeDetails(
|
||||||
|
UUID.randomUUID().toString(), hostNames[i], ipAddress[i], null);
|
||||||
|
nodeManager.register(node, null, null);
|
||||||
|
}
|
||||||
|
// test get node
|
||||||
|
Assert.assertEquals(0, nodeManager.getNodesByAddress(null).size());
|
||||||
|
if (useHostname) {
|
||||||
|
Assert.assertEquals(2,
|
||||||
|
nodeManager.getNodesByAddress("host1").size());
|
||||||
|
Assert.assertEquals(1, nodeManager.getNodesByAddress("host2").size());
|
||||||
|
Assert.assertEquals(0, nodeManager.getNodesByAddress("unknown").size());
|
||||||
|
} else {
|
||||||
|
Assert.assertEquals(2,
|
||||||
|
nodeManager.getNodesByAddress("1.2.3.4").size());
|
||||||
|
Assert.assertEquals(1, nodeManager.getNodesByAddress("2.3.4.5").size());
|
||||||
|
Assert.assertEquals(0, nodeManager.getNodesByAddress("1.9.8.7").size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -44,6 +44,7 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Node Manager to test replication.
|
* A Node Manager to test replication.
|
||||||
@ -323,7 +324,7 @@ public DatanodeDetails getNodeByUuid(String address) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public DatanodeDetails getNodeByAddress(String address) {
|
public List<DatanodeDetails> getNodesByAddress(String address) {
|
||||||
return null;
|
return new LinkedList<>();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user