HDDS-1713. ReplicationManager fail to find proper node topology based… (#1112)

This commit is contained in:
Sammi Chen 2019-07-20 06:45:26 +08:00 committed by Xiaoyu Yao
parent 7f1b76ca35
commit 69a46a95bb
18 changed files with 220 additions and 183 deletions

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.protocol; package org.apache.hadoop.hdds.protocol;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@ -193,12 +194,12 @@ public static DatanodeDetails getFromProtoBuf(
builder.addPort(newPort( builder.addPort(newPort(
Port.Name.valueOf(port.getName().toUpperCase()), port.getValue())); Port.Name.valueOf(port.getName().toUpperCase()), port.getValue()));
} }
if (datanodeDetailsProto.hasNetworkLocation()) {
builder.setNetworkLocation(datanodeDetailsProto.getNetworkLocation());
}
if (datanodeDetailsProto.hasNetworkName()) { if (datanodeDetailsProto.hasNetworkName()) {
builder.setNetworkName(datanodeDetailsProto.getNetworkName()); builder.setNetworkName(datanodeDetailsProto.getNetworkName());
} }
if (datanodeDetailsProto.hasNetworkLocation()) {
builder.setNetworkLocation(datanodeDetailsProto.getNetworkLocation());
}
return builder.build(); return builder.build();
} }
@ -219,8 +220,12 @@ public HddsProtos.DatanodeDetailsProto getProtoBufMessage() {
if (certSerialId != null) { if (certSerialId != null) {
builder.setCertSerialId(certSerialId); builder.setCertSerialId(certSerialId);
} }
builder.setNetworkLocation(getNetworkLocation()); if (!Strings.isNullOrEmpty(getNetworkName())) {
builder.setNetworkName(getNetworkName()); builder.setNetworkName(getNetworkName());
}
if (!Strings.isNullOrEmpty(getNetworkLocation())) {
builder.setNetworkLocation(getNetworkLocation());
}
for (Port port : ports) { for (Port port : ports) {
builder.addPorts(HddsProtos.Port.newBuilder() builder.addPorts(HddsProtos.Port.newBuilder()

View File

@ -128,6 +128,10 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
datanodeDetails.setHostName(response.getHostname()); datanodeDetails.setHostName(response.getHostname());
datanodeDetails.setIpAddress(response.getIpAddress()); datanodeDetails.setIpAddress(response.getIpAddress());
} }
if (response.hasNetworkName() && response.hasNetworkLocation()) {
datanodeDetails.setNetworkName(response.getNetworkName());
datanodeDetails.setNetworkLocation(response.getNetworkLocation());
}
EndpointStateMachine.EndPointStates nextState = EndpointStateMachine.EndPointStates nextState =
rpcEndPoint.getState().getNextState(); rpcEndPoint.getState().getNextState();
rpcEndPoint.setState(nextState); rpcEndPoint.setState(nextState);

View File

@ -17,7 +17,8 @@
*/ */
package org.apache.hadoop.ozone.protocol.commands; package org.apache.hadoop.ozone.protocol.commands;
import com.google.common.base.Preconditions; import com.google.common.base.Strings;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
@ -28,23 +29,15 @@
* Response to Datanode Register call. * Response to Datanode Register call.
*/ */
public class RegisteredCommand { public class RegisteredCommand {
private String datanodeUUID;
private String clusterID; private String clusterID;
private ErrorCode error; private ErrorCode error;
private String hostname; private DatanodeDetails datanode;
private String ipAddress;
public RegisteredCommand(final ErrorCode error, final String datanodeUUID, public RegisteredCommand(final ErrorCode error, final DatanodeDetails node,
final String clusterID) { final String clusterID) {
this(error, datanodeUUID, clusterID, null, null); this.datanode = node;
}
public RegisteredCommand(final ErrorCode error, final String datanodeUUID,
final String clusterID, final String hostname, final String ipAddress) {
this.datanodeUUID = datanodeUUID;
this.clusterID = clusterID; this.clusterID = clusterID;
this.error = error; this.error = error;
this.hostname = hostname;
this.ipAddress = ipAddress;
} }
/** /**
@ -57,12 +50,12 @@ public static Builder newBuilder() {
} }
/** /**
* Returns datanode UUID. * Returns datanode.
* *
* @return - Datanode ID. * @return - Datanode.
*/ */
public String getDatanodeUUID() { public DatanodeDetails getDatanode() {
return datanodeUUID; return datanode;
} }
/** /**
@ -83,79 +76,54 @@ public ErrorCode getError() {
return error; return error;
} }
/**
* Returns the hostname.
*
* @return - hostname
*/
public String getHostName() {
return hostname;
}
/**
* Returns the ipAddress of the dataNode.
*/
public String getIpAddress() {
return ipAddress;
}
/** /**
* Gets the protobuf message of this object. * Gets the protobuf message of this object.
* *
* @return A protobuf message. * @return A protobuf message.
*/ */
public byte[] getProtoBufMessage() { public SCMRegisteredResponseProto getProtoBufMessage() {
SCMRegisteredResponseProto.Builder builder = SCMRegisteredResponseProto.Builder builder =
SCMRegisteredResponseProto.newBuilder() SCMRegisteredResponseProto.newBuilder()
// TODO : Fix this later when we have multiple SCM support.
// .setAddressList(addressList)
.setClusterID(this.clusterID) .setClusterID(this.clusterID)
.setDatanodeUUID(this.datanodeUUID) .setDatanodeUUID(this.datanode.getUuidString())
.setErrorCode(this.error); .setErrorCode(this.error);
if (hostname != null && ipAddress != null) { if (!Strings.isNullOrEmpty(datanode.getHostName())) {
builder.setHostname(hostname).setIpAddress(ipAddress); builder.setHostname(datanode.getHostName());
} }
return builder.build().toByteArray(); if (!Strings.isNullOrEmpty(datanode.getIpAddress())) {
builder.setIpAddress(datanode.getIpAddress());
}
if (!Strings.isNullOrEmpty(datanode.getNetworkName())) {
builder.setNetworkName(datanode.getNetworkName());
}
if (!Strings.isNullOrEmpty(datanode.getNetworkLocation())) {
builder.setNetworkLocation(datanode.getNetworkLocation());
}
return builder.build();
} }
/** /**
* A builder class to verify all values are sane. * A builder class to verify all values are sane.
*/ */
public static class Builder { public static class Builder {
private String datanodeUUID; private DatanodeDetails datanode;
private String clusterID; private String clusterID;
private ErrorCode error; private ErrorCode error;
private String ipAddress;
private String hostname;
/** /**
* sets UUID. * sets datanode details.
* *
* @param dnUUID - datanode UUID * @param node - datanode details
* @return Builder * @return Builder
*/ */
public Builder setDatanodeUUID(String dnUUID) { public Builder setDatanode(DatanodeDetails node) {
this.datanodeUUID = dnUUID; this.datanode = node;
return this; return this;
} }
/**
* Create this object from a Protobuf message.
*
* @param response - RegisteredCmdResponseProto
* @return RegisteredCommand
*/
public RegisteredCommand getFromProtobuf(SCMRegisteredResponseProto
response) {
Preconditions.checkNotNull(response);
if (response.hasHostname() && response.hasIpAddress()) {
return new RegisteredCommand(response.getErrorCode(),
response.getDatanodeUUID(), response.getClusterID(),
response.getHostname(), response.getIpAddress());
} else {
return new RegisteredCommand(response.getErrorCode(),
response.getDatanodeUUID(), response.getClusterID());
}
}
/** /**
* Sets cluster ID. * Sets cluster ID.
* *
@ -178,38 +146,19 @@ public Builder setErrorCode(ErrorCode errorCode) {
return this; return this;
} }
/**
* sets the hostname.
*/
public Builder setHostname(String host) {
this.hostname = host;
return this;
}
public Builder setIpAddress(String ipAddr) {
this.ipAddress = ipAddr;
return this;
}
/** /**
* Build the command object. * Build the command object.
* *
* @return RegisteredCommand * @return RegisteredCommand
*/ */
public RegisteredCommand build() { public RegisteredCommand build() {
if ((this.error == ErrorCode.success) && (this.datanodeUUID == null if ((this.error == ErrorCode.success) && (this.datanode == null
|| this.datanodeUUID.isEmpty()) || (this.clusterID == null || Strings.isNullOrEmpty(this.datanode.getUuidString())
|| this.clusterID.isEmpty())) { || Strings.isNullOrEmpty(this.clusterID))) {
throw new IllegalArgumentException("On success, RegisteredCommand " throw new IllegalArgumentException("On success, RegisteredCommand "
+ "needs datanodeUUID and ClusterID."); + "needs datanodeUUID and ClusterID.");
} }
if (hostname != null && ipAddress != null) { return new RegisteredCommand(this.error, this.datanode, this.clusterID);
return new RegisteredCommand(this.error, this.datanodeUUID,
this.clusterID, this.hostname, this.ipAddress);
} else {
return new RegisteredCommand(this.error, this.datanodeUUID,
this.clusterID);
}
} }
} }
} }

View File

@ -70,6 +70,8 @@ message SCMRegisteredResponseProto {
optional SCMNodeAddressList addressList = 4; optional SCMNodeAddressList addressList = 4;
optional string hostname = 5; optional string hostname = 5;
optional string ipAddress = 6; optional string ipAddress = 6;
optional string networkName = 7;
optional string networkLocation = 8;
} }
/** /**

View File

@ -173,11 +173,19 @@ void processNodeReport(DatanodeDetails datanodeDetails,
List<SCMCommand> getCommandQueue(UUID dnID); List<SCMCommand> getCommandQueue(UUID dnID);
/** /**
* Given datanode host address, returns the DatanodeDetails for the * Given datanode uuid, returns the DatanodeDetails for the node.
* node.
* *
* @param address node host address * @param uuid datanode uuid
* @return the given datanode, or null if not found * @return the given datanode, or null if not found
*/ */
DatanodeDetails getNode(String address); DatanodeDetails getNodeByUuid(String uuid);
/**
* Given datanode address(Ipaddress or hostname), returns the DatanodeDetails
* for the node.
*
* @param address datanode address
* @return the given datanode, or null if not found
*/
DatanodeDetails getNodeByAddress(String address);
} }

View File

@ -19,15 +19,13 @@
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos; .StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.net.InnerNode;
import org.apache.hadoop.hdds.scm.net.NetConstants;
import org.apache.hadoop.hdds.scm.net.NetworkTopology; import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
@ -74,6 +72,7 @@
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -106,6 +105,8 @@ public class SCMNodeManager implements NodeManager {
private final NetworkTopology clusterMap; private final NetworkTopology clusterMap;
private final DNSToSwitchMapping dnsToSwitchMapping; private final DNSToSwitchMapping dnsToSwitchMapping;
private final boolean useHostname; private final boolean useHostname;
private final ConcurrentHashMap<String, String> dnsToUuidMap =
new ConcurrentHashMap<>();
/** /**
* Constructs SCM machine Manager. * Constructs SCM machine Manager.
@ -252,19 +253,21 @@ public RegisteredCommand register(
datanodeDetails.setIpAddress(dnAddress.getHostAddress()); datanodeDetails.setIpAddress(dnAddress.getHostAddress());
} }
try { try {
String location; String dnsName;
String networkLocation;
datanodeDetails.setNetworkName(datanodeDetails.getUuidString());
if (useHostname) { if (useHostname) {
datanodeDetails.setNetworkName(datanodeDetails.getHostName()); dnsName = datanodeDetails.getHostName();
location = nodeResolve(datanodeDetails.getHostName());
} else { } else {
datanodeDetails.setNetworkName(datanodeDetails.getIpAddress()); dnsName = datanodeDetails.getIpAddress();
location = nodeResolve(datanodeDetails.getIpAddress());
} }
if (location != null) { networkLocation = nodeResolve(dnsName);
datanodeDetails.setNetworkLocation(location); if (networkLocation != null) {
datanodeDetails.setNetworkLocation(networkLocation);
} }
nodeStateManager.addNode(datanodeDetails); nodeStateManager.addNode(datanodeDetails);
clusterMap.add(datanodeDetails); clusterMap.add(datanodeDetails);
dnsToUuidMap.put(dnsName, datanodeDetails.getUuidString());
// Updating Node Report, as registration is successful // Updating Node Report, as registration is successful
processNodeReport(datanodeDetails, nodeReport); processNodeReport(datanodeDetails, nodeReport);
LOG.info("Registered Data node : {}", datanodeDetails); LOG.info("Registered Data node : {}", datanodeDetails);
@ -274,10 +277,8 @@ public RegisteredCommand register(
} }
return RegisteredCommand.newBuilder().setErrorCode(ErrorCode.success) return RegisteredCommand.newBuilder().setErrorCode(ErrorCode.success)
.setDatanodeUUID(datanodeDetails.getUuidString()) .setDatanode(datanodeDetails)
.setClusterID(this.scmStorageConfig.getClusterID()) .setClusterID(this.scmStorageConfig.getClusterID())
.setHostname(datanodeDetails.getHostName())
.setIpAddress(datanodeDetails.getIpAddress())
.build(); .build();
} }
@ -553,33 +554,49 @@ public List<SCMCommand> getCommandQueue(UUID dnID) {
} }
/** /**
* Given datanode address or host name, returns the DatanodeDetails for the * Given datanode uuid, returns the DatanodeDetails for the node.
* node.
* *
* @param address node host address * @param uuid node host address
* @return the given datanode, or null if not found * @return the given datanode, or null if not found
*/ */
@Override @Override
public DatanodeDetails getNode(String address) { public DatanodeDetails getNodeByUuid(String uuid) {
Node node = null; if (Strings.isNullOrEmpty(uuid)) {
String location = nodeResolve(address); LOG.warn("uuid is null");
if (location != null) { return null;
node = clusterMap.getNode(location + NetConstants.PATH_SEPARATOR_STR +
address);
} }
DatanodeDetails temp = DatanodeDetails.newBuilder().setUuid(uuid).build();
try {
return nodeStateManager.getNode(temp);
} catch (NodeNotFoundException e) {
LOG.warn("Cannot find node for uuid {}", uuid);
return null;
}
}
if (node != null) { /**
if (node instanceof InnerNode) { * Given datanode address(Ipaddress or hostname), returns the DatanodeDetails
LOG.warn("Get node for {} return {}, it's an inner node, " + * for the node.
"not a datanode", address, node.getNetworkFullPath()); *
} else { * @param address datanode address
LOG.debug("Get node for {} return {}", address, * @return the given datanode, or null if not found
node.getNetworkFullPath()); */
return (DatanodeDetails)node; @Override
} public DatanodeDetails getNodeByAddress(String address) {
} else { if (Strings.isNullOrEmpty(address)) {
LOG.warn("Cannot find node for {}", address); LOG.warn("address is null");
return null;
} }
String uuid = dnsToUuidMap.get(address);
if (uuid != null) {
DatanodeDetails temp = DatanodeDetails.newBuilder().setUuid(uuid).build();
try {
return nodeStateManager.getNode(temp);
} catch (NodeNotFoundException e) {
LOG.warn("Cannot find node for uuid {}", uuid);
}
}
LOG.warn("Cannot find node for address {}", address);
return null; return null;
} }

View File

@ -57,7 +57,7 @@ public void onMessage(PipelineActionsFromDatanode report,
pipelineID = PipelineID. pipelineID = PipelineID.
getFromProtobuf(action.getClosePipeline().getPipelineID()); getFromProtobuf(action.getClosePipeline().getPipelineID());
Pipeline pipeline = pipelineManager.getPipeline(pipelineID); Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
LOG.info("Received pipeline action {} for {} from datanode [}", LOG.info("Received pipeline action {} for {} from datanode {}",
action.getAction(), pipeline, report.getDatanodeDetails()); action.getAction(), pipeline, report.getDatanodeDetails());
pipelineManager.finalizeAndDestroyPipeline(pipeline, true); pipelineManager.finalizeAndDestroyPipeline(pipeline, true);
} catch (IOException ioe) { } catch (IOException ioe) {

View File

@ -288,12 +288,12 @@ public List<DatanodeDetails> sortDatanodes(List<String> nodes,
boolean auditSuccess = true; boolean auditSuccess = true;
try{ try{
NodeManager nodeManager = scm.getScmNodeManager(); NodeManager nodeManager = scm.getScmNodeManager();
Node client = nodeManager.getNode(clientMachine); Node client = nodeManager.getNodeByAddress(clientMachine);
List<Node> nodeList = new ArrayList(); List<Node> nodeList = new ArrayList();
nodes.stream().forEach(path -> { nodes.stream().forEach(uuid -> {
DatanodeDetails node = nodeManager.getNode(path); DatanodeDetails node = nodeManager.getNodeByUuid(uuid);
if (node != null) { if (node != null) {
nodeList.add(nodeManager.getNode(path)); nodeList.add(node);
} }
}); });
List<? extends Node> sortedNodeList = scm.getClusterMap() List<? extends Node> sortedNodeList = scm.getClusterMap()

View File

@ -265,15 +265,7 @@ public SCMRegisteredResponseProto register(
@VisibleForTesting @VisibleForTesting
public static SCMRegisteredResponseProto getRegisteredResponse( public static SCMRegisteredResponseProto getRegisteredResponse(
RegisteredCommand cmd) { RegisteredCommand cmd) {
return SCMRegisteredResponseProto.newBuilder() return cmd.getProtoBufMessage();
// TODO : Fix this later when we have multiple SCM support.
// .setAddressList(addressList)
.setErrorCode(cmd.getError())
.setClusterID(cmd.getClusterID())
.setDatanodeUUID(cmd.getDatanodeUUID())
.setIpAddress(cmd.getIpAddress())
.setHostname(cmd.getHostName())
.build();
} }
@Override @Override

View File

@ -137,8 +137,10 @@ public static DatanodeDetails createDatanodeDetails(UUID uuid) {
*/ */
public static DatanodeDetails getDatanodeDetails( public static DatanodeDetails getDatanodeDetails(
RegisteredCommand registeredCommand) { RegisteredCommand registeredCommand) {
return createDatanodeDetails(registeredCommand.getDatanodeUUID(), return createDatanodeDetails(
registeredCommand.getHostName(), registeredCommand.getIpAddress(), registeredCommand.getDatanode().getUuidString(),
registeredCommand.getDatanode().getHostName(),
registeredCommand.getDatanode().getIpAddress(),
null); null);
} }

View File

@ -54,6 +54,7 @@
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
@ -87,6 +88,7 @@ public class MockNodeManager implements NodeManager {
private final Node2PipelineMap node2PipelineMap; private final Node2PipelineMap node2PipelineMap;
private final Node2ContainerMap node2ContainerMap; private final Node2ContainerMap node2ContainerMap;
private NetworkTopology clusterMap; private NetworkTopology clusterMap;
private ConcurrentHashMap<String, String> dnsToUuidMap;
public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
this.healthyNodes = new LinkedList<>(); this.healthyNodes = new LinkedList<>();
@ -95,6 +97,7 @@ public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
this.nodeMetricMap = new HashMap<>(); this.nodeMetricMap = new HashMap<>();
this.node2PipelineMap = new Node2PipelineMap(); this.node2PipelineMap = new Node2PipelineMap();
this.node2ContainerMap = new Node2ContainerMap(); this.node2ContainerMap = new Node2ContainerMap();
this.dnsToUuidMap = new ConcurrentHashMap();
aggregateStat = new SCMNodeStat(); aggregateStat = new SCMNodeStat();
if (initializeFakeNodes) { if (initializeFakeNodes) {
for (int x = 0; x < nodeCount; x++) { for (int x = 0; x < nodeCount; x++) {
@ -370,7 +373,10 @@ public RegisteredCommand register(DatanodeDetails datanodeDetails,
try { try {
node2ContainerMap.insertNewDatanode(datanodeDetails.getUuid(), node2ContainerMap.insertNewDatanode(datanodeDetails.getUuid(),
Collections.emptySet()); Collections.emptySet());
dnsToUuidMap.put(datanodeDetails.getIpAddress(),
datanodeDetails.getUuidString());
if (clusterMap != null) { if (clusterMap != null) {
datanodeDetails.setNetworkName(datanodeDetails.getUuidString());
clusterMap.add(datanodeDetails); clusterMap.add(datanodeDetails);
} }
} catch (SCMException e) { } catch (SCMException e) {
@ -459,11 +465,16 @@ public List<SCMCommand> getCommandQueue(UUID dnID) {
} }
@Override @Override
public DatanodeDetails getNode(String address) { public DatanodeDetails getNodeByUuid(String uuid) {
Node node = clusterMap.getNode(NetConstants.DEFAULT_RACK + "/" + address); Node node = clusterMap.getNode(NetConstants.DEFAULT_RACK + "/" + uuid);
return node == null ? null : (DatanodeDetails)node; return node == null ? null : (DatanodeDetails)node;
} }
@Override
public DatanodeDetails getNodeByAddress(String address) {
return getNodeByUuid(dnsToUuidMap.get(address));
}
public void setNetworkTopology(NetworkTopology topology) { public void setNetworkTopology(NetworkTopology topology) {
this.clusterMap = topology; this.clusterMap = topology;
} }

View File

@ -137,10 +137,6 @@ public void chooseNodeWithNoExcludedNodes() throws SCMException {
datanodeDetails.get(2))); datanodeDetails.get(2)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1), Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1),
datanodeDetails.get(2))); datanodeDetails.get(2)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
datanodeDetails.get(3)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(2),
datanodeDetails.get(3)));
} }
@Test @Test
@ -188,7 +184,7 @@ public void testFallback() throws SCMException {
// 5 replicas. there are only 3 racks. policy with fallback should // 5 replicas. there are only 3 racks. policy with fallback should
// allocate the 5th datanode though it will break the rack rule(first // allocate the 5th datanode though it will break the rack rule(first
// 2 replicas on same rack, others are different racks). // 2 replicas on same rack, others on different racks).
int nodeNum = 5; int nodeNum = 5;
List<DatanodeDetails> datanodeDetails = List<DatanodeDetails> datanodeDetails =
policy.chooseDatanodes(null, null, nodeNum, 15); policy.chooseDatanodes(null, null, nodeNum, 15);
@ -199,10 +195,6 @@ public void testFallback() throws SCMException {
datanodeDetails.get(2))); datanodeDetails.get(2)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1), Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1),
datanodeDetails.get(2))); datanodeDetails.get(2)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0),
datanodeDetails.get(3)));
Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(2),
datanodeDetails.get(3)));
} }

View File

@ -1152,10 +1152,10 @@ private void testScmRegisterNodeWithNetworkTopology(boolean useHostname)
// test get node // test get node
if (useHostname) { if (useHostname) {
Arrays.stream(hostNames).forEach(hostname -> Arrays.stream(hostNames).forEach(hostname ->
Assert.assertNotNull(nodeManager.getNode(hostname))); Assert.assertNotNull(nodeManager.getNodeByAddress(hostname)));
} else { } else {
Arrays.stream(ipAddress).forEach(ip -> Arrays.stream(ipAddress).forEach(ip ->
Assert.assertNotNull(nodeManager.getNode(ip))); Assert.assertNotNull(nodeManager.getNodeByAddress(ip)));
} }
} }
} }

View File

@ -35,6 +35,7 @@
import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.UUID;
/** /**
* Test class for @{@link SCMBlockProtocolServer}. * Test class for @{@link SCMBlockProtocolServer}.
@ -105,8 +106,8 @@ public void testSortDatanodes() throws Exception {
node -> System.out.println(node.toString())); node -> System.out.println(node.toString()));
Assert.assertTrue(datanodeDetails.size() == nodeCount); Assert.assertTrue(datanodeDetails.size() == nodeCount);
// illegal nodes to sort 1 // unknown node to sort
nodes.add("/default-rack"); nodes.add(UUID.randomUUID().toString());
ScmBlockLocationProtocolProtos.SortDatanodesRequestProto request = ScmBlockLocationProtocolProtos.SortDatanodesRequestProto request =
ScmBlockLocationProtocolProtos.SortDatanodesRequestProto ScmBlockLocationProtocolProtos.SortDatanodesRequestProto
.newBuilder() .newBuilder()
@ -120,25 +121,11 @@ public void testSortDatanodes() throws Exception {
resp.getNodeList().stream().forEach( resp.getNodeList().stream().forEach(
node -> System.out.println(node.getNetworkName())); node -> System.out.println(node.getNetworkName()));
// illegal nodes to sort 2 // all unknown nodes
nodes.remove("/default-rack");
nodes.add(nodes.get(0) + "X");
request = ScmBlockLocationProtocolProtos.SortDatanodesRequestProto
.newBuilder()
.addAllNodeNetworkName(nodes)
.setClient(client)
.build();
resp = service.sortDatanodes(request);
Assert.assertTrue(resp.getNodeList().size() == nodeCount);
System.out.println("client = " + client);
resp.getNodeList().stream().forEach(
node -> System.out.println(node.getNetworkName()));
// all illegal nodes
nodes.clear(); nodes.clear();
nodes.add("/default-rack"); nodes.add(UUID.randomUUID().toString());
nodes.add("/default-rack-1"); nodes.add(UUID.randomUUID().toString());
nodes.add("/default-rack-2"); nodes.add(UUID.randomUUID().toString());
request = ScmBlockLocationProtocolProtos.SortDatanodesRequestProto request = ScmBlockLocationProtocolProtos.SortDatanodesRequestProto
.newBuilder() .newBuilder()
.addAllNodeNetworkName(nodes) .addAllNodeNetworkName(nodes)

View File

@ -311,7 +311,12 @@ public List<SCMCommand> getCommandQueue(UUID dnID) {
} }
@Override @Override
public DatanodeDetails getNode(String address) { public DatanodeDetails getNodeByUuid(String address) {
return null;
}
@Override
public DatanodeDetails getNodeByAddress(String address) {
return null; return null;
} }
} }

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.ozone; package org.apache.hadoop.ozone;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -41,7 +43,9 @@
import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
@ -58,11 +62,14 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer; import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@ -72,6 +79,7 @@
import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.HddsVersionInfo; import org.apache.hadoop.utils.HddsVersionInfo;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
@ -483,6 +491,53 @@ public void testScmInfo() throws Exception {
Assert.assertEquals(expectedVersion, actualVersion); Assert.assertEquals(expectedVersion, actualVersion);
} }
/**
* Test datanode heartbeat well processed with a 4-layer network topology.
*/
@Test(timeout = 60000)
public void testScmProcessDatanodeHeartbeat() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
String scmId = UUID.randomUUID().toString();
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
StaticMapping.class, DNSToSwitchMapping.class);
StaticMapping.addNodeToRack(HddsUtils.getHostName(conf), "/rack1");
final int datanodeNum = 3;
MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(datanodeNum)
.setScmId(scmId)
.build();
cluster.waitForClusterToBeReady();
StorageContainerManager scm = cluster.getStorageContainerManager();
try {
// first sleep 10s
Thread.sleep(10000);
// verify datanode heartbeats are well processed
long heartbeatCheckerIntervalMs =
MiniOzoneCluster.Builder.DEFAULT_HB_INTERVAL_MS;
long start = Time.monotonicNow();
Thread.sleep(heartbeatCheckerIntervalMs * 2);
List<DatanodeDetails> allNodes = scm.getScmNodeManager().getAllNodes();
Assert.assertTrue(allNodes.size() == datanodeNum);
for (int i = 0; i < allNodes.size(); i++) {
DatanodeInfo datanodeInfo = (DatanodeInfo) scm.getScmNodeManager()
.getNodeByUuid(allNodes.get(i).getUuidString());
Assert.assertTrue((datanodeInfo.getLastHeartbeatTime() - start)
>= heartbeatCheckerIntervalMs);
Assert.assertTrue(datanodeInfo.getUuidString()
.equals(datanodeInfo.getNetworkName()));
Assert.assertTrue(datanodeInfo.getNetworkLocation()
.equals("/rack1"));
}
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test @Test
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testCloseContainerCommandOnRestart() throws Exception { public void testCloseContainerCommandOnRestart() throws Exception {

View File

@ -138,7 +138,10 @@ public static void setUp() throws Exception {
NodeSchemaManager schemaManager = NodeSchemaManager.getInstance(); NodeSchemaManager schemaManager = NodeSchemaManager.getInstance();
schemaManager.init(schemas, false); schemaManager.init(schemas, false);
NetworkTopology clusterMap = new NetworkTopologyImpl(schemaManager); NetworkTopology clusterMap = new NetworkTopologyImpl(schemaManager);
nodeManager.getAllNodes().stream().forEach(node -> clusterMap.add(node)); nodeManager.getAllNodes().stream().forEach(node -> {
node.setNetworkName(node.getUuidString());
clusterMap.add(node);
});
((MockNodeManager)nodeManager).setNetworkTopology(clusterMap); ((MockNodeManager)nodeManager).setNetworkTopology(clusterMap);
SCMConfigurator configurator = new SCMConfigurator(); SCMConfigurator configurator = new SCMConfigurator();
configurator.setScmNodeManager(nodeManager); configurator.setScmNodeManager(nodeManager);
@ -696,17 +699,17 @@ public void testLookupKeyWithLocation() throws IOException {
Assert.assertNotEquals(follower1, follower2); Assert.assertNotEquals(follower1, follower2);
// lookup key, leader as client // lookup key, leader as client
OmKeyInfo key1 = keyManager.lookupKey(keyArgs, leader.getNetworkName()); OmKeyInfo key1 = keyManager.lookupKey(keyArgs, leader.getIpAddress());
Assert.assertEquals(leader, key1.getLatestVersionLocations() Assert.assertEquals(leader, key1.getLatestVersionLocations()
.getLocationList().get(0).getPipeline().getClosestNode()); .getLocationList().get(0).getPipeline().getClosestNode());
// lookup key, follower1 as client // lookup key, follower1 as client
OmKeyInfo key2 = keyManager.lookupKey(keyArgs, follower1.getNetworkName()); OmKeyInfo key2 = keyManager.lookupKey(keyArgs, follower1.getIpAddress());
Assert.assertEquals(follower1, key2.getLatestVersionLocations() Assert.assertEquals(follower1, key2.getLatestVersionLocations()
.getLocationList().get(0).getPipeline().getClosestNode()); .getLocationList().get(0).getPipeline().getClosestNode());
// lookup key, follower2 as client // lookup key, follower2 as client
OmKeyInfo key3 = keyManager.lookupKey(keyArgs, follower2.getNetworkName()); OmKeyInfo key3 = keyManager.lookupKey(keyArgs, follower2.getIpAddress());
Assert.assertEquals(follower2, key3.getLatestVersionLocations() Assert.assertEquals(follower2, key3.getLatestVersionLocations()
.getLocationList().get(0).getPipeline().getClosestNode()); .getLocationList().get(0).getPipeline().getClosestNode());

View File

@ -2123,9 +2123,14 @@ private void sortDatanodeInPipeline(OmKeyInfo keyInfo, String clientMachine) {
for (OmKeyLocationInfoGroup key : keyInfo.getKeyLocationVersions()) { for (OmKeyLocationInfoGroup key : keyInfo.getKeyLocationVersions()) {
key.getLocationList().forEach(k -> { key.getLocationList().forEach(k -> {
List<DatanodeDetails> nodes = k.getPipeline().getNodes(); List<DatanodeDetails> nodes = k.getPipeline().getNodes();
if (nodes == null || nodes.size() == 0) {
LOG.warn("Datanodes for pipeline {} is empty",
k.getPipeline().getId().toString());
return;
}
List<String> nodeList = new ArrayList<>(); List<String> nodeList = new ArrayList<>();
nodes.stream().forEach(node -> nodes.stream().forEach(node ->
nodeList.add(node.getNetworkName())); nodeList.add(node.getUuidString()));
try { try {
List<DatanodeDetails> sortedNodes = scmClient.getBlockClient() List<DatanodeDetails> sortedNodes = scmClient.getBlockClient()
.sortDatanodes(nodeList, clientMachine); .sortDatanodes(nodeList, clientMachine);