From 69a46a95bb6ab953d4bccdb133565667daca5c96 Mon Sep 17 00:00:00 2001 From: Sammi Chen Date: Sat, 20 Jul 2019 06:45:26 +0800 Subject: [PATCH] =?UTF-8?q?HDDS-1713.=20ReplicationManager=20fail=20to=20f?= =?UTF-8?q?ind=20proper=20node=20topology=20based=E2=80=A6=20(#1112)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../hadoop/hdds/protocol/DatanodeDetails.java | 15 ++- .../states/endpoint/RegisterEndpointTask.java | 4 + .../protocol/commands/RegisteredCommand.java | 121 +++++------------- .../StorageContainerDatanodeProtocol.proto | 2 + .../hadoop/hdds/scm/node/NodeManager.java | 16 ++- .../hadoop/hdds/scm/node/SCMNodeManager.java | 83 +++++++----- .../scm/pipeline/PipelineActionHandler.java | 2 +- .../scm/server/SCMBlockProtocolServer.java | 8 +- .../scm/server/SCMDatanodeProtocolServer.java | 10 +- .../org/apache/hadoop/hdds/scm/TestUtils.java | 6 +- .../hdds/scm/container/MockNodeManager.java | 15 ++- .../TestSCMContainerPlacementRackAware.java | 10 +- .../hdds/scm/node/TestSCMNodeManager.java | 4 +- .../server/TestSCMBlockProtocolServer.java | 27 +--- .../testutils/ReplicationNodeManagerMock.java | 7 +- .../ozone/TestStorageContainerManager.java | 55 ++++++++ .../hadoop/ozone/om/TestKeyManagerImpl.java | 11 +- .../hadoop/ozone/om/KeyManagerImpl.java | 7 +- 18 files changed, 220 insertions(+), 183 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java index 31e4df03e3..698a443fc6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.protocol; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -193,12 +194,12 @@ public static DatanodeDetails getFromProtoBuf( builder.addPort(newPort( Port.Name.valueOf(port.getName().toUpperCase()), port.getValue())); } - if (datanodeDetailsProto.hasNetworkLocation()) { - builder.setNetworkLocation(datanodeDetailsProto.getNetworkLocation()); - } if (datanodeDetailsProto.hasNetworkName()) { builder.setNetworkName(datanodeDetailsProto.getNetworkName()); } + if (datanodeDetailsProto.hasNetworkLocation()) { + builder.setNetworkLocation(datanodeDetailsProto.getNetworkLocation()); + } return builder.build(); } @@ -219,8 +220,12 @@ public HddsProtos.DatanodeDetailsProto getProtoBufMessage() { if (certSerialId != null) { builder.setCertSerialId(certSerialId); } - builder.setNetworkLocation(getNetworkLocation()); - builder.setNetworkName(getNetworkName()); + if (!Strings.isNullOrEmpty(getNetworkName())) { + builder.setNetworkName(getNetworkName()); + } + if (!Strings.isNullOrEmpty(getNetworkLocation())) { + builder.setNetworkLocation(getNetworkLocation()); + } for (Port port : ports) { builder.addPorts(HddsProtos.Port.newBuilder() diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java index 9918f9dfda..b94b1cfc85 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/RegisterEndpointTask.java @@ -128,6 +128,10 @@ public EndpointStateMachine.EndPointStates call() throws Exception { datanodeDetails.setHostName(response.getHostname()); datanodeDetails.setIpAddress(response.getIpAddress()); } + if (response.hasNetworkName() && response.hasNetworkLocation()) { + datanodeDetails.setNetworkName(response.getNetworkName()); + datanodeDetails.setNetworkLocation(response.getNetworkLocation()); + } EndpointStateMachine.EndPointStates nextState = rpcEndPoint.getState().getNextState(); rpcEndPoint.setState(nextState); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java index 3a5da72f48..42778cb6e4 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RegisteredCommand.java @@ -17,7 +17,8 @@ */ package org.apache.hadoop.ozone.protocol.commands; -import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto; import org.apache.hadoop.hdds.protocol.proto @@ -28,23 +29,15 @@ * Response to Datanode Register call. */ public class RegisteredCommand { - private String datanodeUUID; private String clusterID; private ErrorCode error; - private String hostname; - private String ipAddress; + private DatanodeDetails datanode; - public RegisteredCommand(final ErrorCode error, final String datanodeUUID, + public RegisteredCommand(final ErrorCode error, final DatanodeDetails node, final String clusterID) { - this(error, datanodeUUID, clusterID, null, null); - } - public RegisteredCommand(final ErrorCode error, final String datanodeUUID, - final String clusterID, final String hostname, final String ipAddress) { - this.datanodeUUID = datanodeUUID; + this.datanode = node; this.clusterID = clusterID; this.error = error; - this.hostname = hostname; - this.ipAddress = ipAddress; } /** @@ -57,12 +50,12 @@ public static Builder newBuilder() { } /** - * Returns datanode UUID. + * Returns datanode. * - * @return - Datanode ID. + * @return - Datanode. */ - public String getDatanodeUUID() { - return datanodeUUID; + public DatanodeDetails getDatanode() { + return datanode; } /** @@ -83,79 +76,54 @@ public ErrorCode getError() { return error; } - /** - * Returns the hostname. - * - * @return - hostname - */ - public String getHostName() { - return hostname; - } - - /** - * Returns the ipAddress of the dataNode. - */ - public String getIpAddress() { - return ipAddress; - } - /** * Gets the protobuf message of this object. * * @return A protobuf message. */ - public byte[] getProtoBufMessage() { + public SCMRegisteredResponseProto getProtoBufMessage() { SCMRegisteredResponseProto.Builder builder = SCMRegisteredResponseProto.newBuilder() + // TODO : Fix this later when we have multiple SCM support. + // .setAddressList(addressList) .setClusterID(this.clusterID) - .setDatanodeUUID(this.datanodeUUID) + .setDatanodeUUID(this.datanode.getUuidString()) .setErrorCode(this.error); - if (hostname != null && ipAddress != null) { - builder.setHostname(hostname).setIpAddress(ipAddress); + if (!Strings.isNullOrEmpty(datanode.getHostName())) { + builder.setHostname(datanode.getHostName()); } - return builder.build().toByteArray(); + if (!Strings.isNullOrEmpty(datanode.getIpAddress())) { + builder.setIpAddress(datanode.getIpAddress()); + } + if (!Strings.isNullOrEmpty(datanode.getNetworkName())) { + builder.setNetworkName(datanode.getNetworkName()); + } + if (!Strings.isNullOrEmpty(datanode.getNetworkLocation())) { + builder.setNetworkLocation(datanode.getNetworkLocation()); + } + + return builder.build(); } /** * A builder class to verify all values are sane. */ public static class Builder { - private String datanodeUUID; + private DatanodeDetails datanode; private String clusterID; private ErrorCode error; - private String ipAddress; - private String hostname; /** - * sets UUID. + * sets datanode details. * - * @param dnUUID - datanode UUID + * @param node - datanode details * @return Builder */ - public Builder setDatanodeUUID(String dnUUID) { - this.datanodeUUID = dnUUID; + public Builder setDatanode(DatanodeDetails node) { + this.datanode = node; return this; } - /** - * Create this object from a Protobuf message. - * - * @param response - RegisteredCmdResponseProto - * @return RegisteredCommand - */ - public RegisteredCommand getFromProtobuf(SCMRegisteredResponseProto - response) { - Preconditions.checkNotNull(response); - if (response.hasHostname() && response.hasIpAddress()) { - return new RegisteredCommand(response.getErrorCode(), - response.getDatanodeUUID(), response.getClusterID(), - response.getHostname(), response.getIpAddress()); - } else { - return new RegisteredCommand(response.getErrorCode(), - response.getDatanodeUUID(), response.getClusterID()); - } - } - /** * Sets cluster ID. * @@ -178,38 +146,19 @@ public Builder setErrorCode(ErrorCode errorCode) { return this; } - /** - * sets the hostname. - */ - public Builder setHostname(String host) { - this.hostname = host; - return this; - } - - public Builder setIpAddress(String ipAddr) { - this.ipAddress = ipAddr; - return this; - } - /** * Build the command object. * * @return RegisteredCommand */ public RegisteredCommand build() { - if ((this.error == ErrorCode.success) && (this.datanodeUUID == null - || this.datanodeUUID.isEmpty()) || (this.clusterID == null - || this.clusterID.isEmpty())) { + if ((this.error == ErrorCode.success) && (this.datanode == null + || Strings.isNullOrEmpty(this.datanode.getUuidString()) + || Strings.isNullOrEmpty(this.clusterID))) { throw new IllegalArgumentException("On success, RegisteredCommand " + "needs datanodeUUID and ClusterID."); } - if (hostname != null && ipAddress != null) { - return new RegisteredCommand(this.error, this.datanodeUUID, - this.clusterID, this.hostname, this.ipAddress); - } else { - return new RegisteredCommand(this.error, this.datanodeUUID, - this.clusterID); - } + return new RegisteredCommand(this.error, this.datanode, this.clusterID); } } } diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto index 73dc1cc533..500735a35c 100644 --- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -70,6 +70,8 @@ message SCMRegisteredResponseProto { optional SCMNodeAddressList addressList = 4; optional string hostname = 5; optional string ipAddress = 6; + optional string networkName = 7; + optional string networkLocation = 8; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index 6b05772cc8..0ccbb82d43 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -173,11 +173,19 @@ void processNodeReport(DatanodeDetails datanodeDetails, List getCommandQueue(UUID dnID); /** - * Given datanode host address, returns the DatanodeDetails for the - * node. + * Given datanode uuid, returns the DatanodeDetails for the node. * - * @param address node host address + * @param uuid datanode uuid * @return the given datanode, or null if not found */ - DatanodeDetails getNode(String address); + DatanodeDetails getNodeByUuid(String uuid); + + /** + * Given datanode address(Ipaddress or hostname), returns the DatanodeDetails + * for the node. + * + * @param address datanode address + * @return the given datanode, or null if not found + */ + DatanodeDetails getNodeByAddress(String address); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index e4e9d35bdf..93ca304119 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -19,15 +19,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.net.InnerNode; -import org.apache.hadoop.hdds.scm.net.NetConstants; import org.apache.hadoop.hdds.scm.net.NetworkTopology; -import org.apache.hadoop.hdds.scm.net.Node; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; @@ -74,6 +72,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.stream.Collectors; @@ -106,6 +105,8 @@ public class SCMNodeManager implements NodeManager { private final NetworkTopology clusterMap; private final DNSToSwitchMapping dnsToSwitchMapping; private final boolean useHostname; + private final ConcurrentHashMap dnsToUuidMap = + new ConcurrentHashMap<>(); /** * Constructs SCM machine Manager. @@ -252,19 +253,21 @@ public RegisteredCommand register( datanodeDetails.setIpAddress(dnAddress.getHostAddress()); } try { - String location; + String dnsName; + String networkLocation; + datanodeDetails.setNetworkName(datanodeDetails.getUuidString()); if (useHostname) { - datanodeDetails.setNetworkName(datanodeDetails.getHostName()); - location = nodeResolve(datanodeDetails.getHostName()); + dnsName = datanodeDetails.getHostName(); } else { - datanodeDetails.setNetworkName(datanodeDetails.getIpAddress()); - location = nodeResolve(datanodeDetails.getIpAddress()); + dnsName = datanodeDetails.getIpAddress(); } - if (location != null) { - datanodeDetails.setNetworkLocation(location); + networkLocation = nodeResolve(dnsName); + if (networkLocation != null) { + datanodeDetails.setNetworkLocation(networkLocation); } nodeStateManager.addNode(datanodeDetails); clusterMap.add(datanodeDetails); + dnsToUuidMap.put(dnsName, datanodeDetails.getUuidString()); // Updating Node Report, as registration is successful processNodeReport(datanodeDetails, nodeReport); LOG.info("Registered Data node : {}", datanodeDetails); @@ -274,10 +277,8 @@ public RegisteredCommand register( } return RegisteredCommand.newBuilder().setErrorCode(ErrorCode.success) - .setDatanodeUUID(datanodeDetails.getUuidString()) + .setDatanode(datanodeDetails) .setClusterID(this.scmStorageConfig.getClusterID()) - .setHostname(datanodeDetails.getHostName()) - .setIpAddress(datanodeDetails.getIpAddress()) .build(); } @@ -553,33 +554,49 @@ public List getCommandQueue(UUID dnID) { } /** - * Given datanode address or host name, returns the DatanodeDetails for the - * node. + * Given datanode uuid, returns the DatanodeDetails for the node. * - * @param address node host address + * @param uuid node host address * @return the given datanode, or null if not found */ @Override - public DatanodeDetails getNode(String address) { - Node node = null; - String location = nodeResolve(address); - if (location != null) { - node = clusterMap.getNode(location + NetConstants.PATH_SEPARATOR_STR + - address); + public DatanodeDetails getNodeByUuid(String uuid) { + if (Strings.isNullOrEmpty(uuid)) { + LOG.warn("uuid is null"); + return null; } + DatanodeDetails temp = DatanodeDetails.newBuilder().setUuid(uuid).build(); + try { + return nodeStateManager.getNode(temp); + } catch (NodeNotFoundException e) { + LOG.warn("Cannot find node for uuid {}", uuid); + return null; + } + } - if (node != null) { - if (node instanceof InnerNode) { - LOG.warn("Get node for {} return {}, it's an inner node, " + - "not a datanode", address, node.getNetworkFullPath()); - } else { - LOG.debug("Get node for {} return {}", address, - node.getNetworkFullPath()); - return (DatanodeDetails)node; - } - } else { - LOG.warn("Cannot find node for {}", address); + /** + * Given datanode address(Ipaddress or hostname), returns the DatanodeDetails + * for the node. + * + * @param address datanode address + * @return the given datanode, or null if not found + */ + @Override + public DatanodeDetails getNodeByAddress(String address) { + if (Strings.isNullOrEmpty(address)) { + LOG.warn("address is null"); + return null; } + String uuid = dnsToUuidMap.get(address); + if (uuid != null) { + DatanodeDetails temp = DatanodeDetails.newBuilder().setUuid(uuid).build(); + try { + return nodeStateManager.getNode(temp); + } catch (NodeNotFoundException e) { + LOG.warn("Cannot find node for uuid {}", uuid); + } + } + LOG.warn("Cannot find node for address {}", address); return null; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java index da704d24af..955bfc6eed 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java @@ -57,7 +57,7 @@ public void onMessage(PipelineActionsFromDatanode report, pipelineID = PipelineID. getFromProtobuf(action.getClosePipeline().getPipelineID()); Pipeline pipeline = pipelineManager.getPipeline(pipelineID); - LOG.info("Received pipeline action {} for {} from datanode [}", + LOG.info("Received pipeline action {} for {} from datanode {}", action.getAction(), pipeline, report.getDatanodeDetails()); pipelineManager.finalizeAndDestroyPipeline(pipeline, true); } catch (IOException ioe) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java index 1c76070d3b..35ec2954ce 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java @@ -288,12 +288,12 @@ public List sortDatanodes(List nodes, boolean auditSuccess = true; try{ NodeManager nodeManager = scm.getScmNodeManager(); - Node client = nodeManager.getNode(clientMachine); + Node client = nodeManager.getNodeByAddress(clientMachine); List nodeList = new ArrayList(); - nodes.stream().forEach(path -> { - DatanodeDetails node = nodeManager.getNode(path); + nodes.stream().forEach(uuid -> { + DatanodeDetails node = nodeManager.getNodeByUuid(uuid); if (node != null) { - nodeList.add(nodeManager.getNode(path)); + nodeList.add(node); } }); List sortedNodeList = scm.getClusterMap() diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index cd78d3d17f..6dd9dab827 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -265,15 +265,7 @@ public SCMRegisteredResponseProto register( @VisibleForTesting public static SCMRegisteredResponseProto getRegisteredResponse( RegisteredCommand cmd) { - return SCMRegisteredResponseProto.newBuilder() - // TODO : Fix this later when we have multiple SCM support. - // .setAddressList(addressList) - .setErrorCode(cmd.getError()) - .setClusterID(cmd.getClusterID()) - .setDatanodeUUID(cmd.getDatanodeUUID()) - .setIpAddress(cmd.getIpAddress()) - .setHostname(cmd.getHostName()) - .build(); + return cmd.getProtoBufMessage(); } @Override diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index e2ce7de047..37321d7699 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -137,8 +137,10 @@ public static DatanodeDetails createDatanodeDetails(UUID uuid) { */ public static DatanodeDetails getDatanodeDetails( RegisteredCommand registeredCommand) { - return createDatanodeDetails(registeredCommand.getDatanodeUUID(), - registeredCommand.getHostName(), registeredCommand.getIpAddress(), + return createDatanodeDetails( + registeredCommand.getDatanode().getUuidString(), + registeredCommand.getDatanode().getHostName(), + registeredCommand.getDatanode().getIpAddress(), null); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 64eba29ed9..b85daa25dd 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -54,6 +54,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState @@ -87,6 +88,7 @@ public class MockNodeManager implements NodeManager { private final Node2PipelineMap node2PipelineMap; private final Node2ContainerMap node2ContainerMap; private NetworkTopology clusterMap; + private ConcurrentHashMap dnsToUuidMap; public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { this.healthyNodes = new LinkedList<>(); @@ -95,6 +97,7 @@ public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { this.nodeMetricMap = new HashMap<>(); this.node2PipelineMap = new Node2PipelineMap(); this.node2ContainerMap = new Node2ContainerMap(); + this.dnsToUuidMap = new ConcurrentHashMap(); aggregateStat = new SCMNodeStat(); if (initializeFakeNodes) { for (int x = 0; x < nodeCount; x++) { @@ -370,7 +373,10 @@ public RegisteredCommand register(DatanodeDetails datanodeDetails, try { node2ContainerMap.insertNewDatanode(datanodeDetails.getUuid(), Collections.emptySet()); + dnsToUuidMap.put(datanodeDetails.getIpAddress(), + datanodeDetails.getUuidString()); if (clusterMap != null) { + datanodeDetails.setNetworkName(datanodeDetails.getUuidString()); clusterMap.add(datanodeDetails); } } catch (SCMException e) { @@ -459,11 +465,16 @@ public List getCommandQueue(UUID dnID) { } @Override - public DatanodeDetails getNode(String address) { - Node node = clusterMap.getNode(NetConstants.DEFAULT_RACK + "/" + address); + public DatanodeDetails getNodeByUuid(String uuid) { + Node node = clusterMap.getNode(NetConstants.DEFAULT_RACK + "/" + uuid); return node == null ? null : (DatanodeDetails)node; } + @Override + public DatanodeDetails getNodeByAddress(String address) { + return getNodeByUuid(dnsToUuidMap.get(address)); + } + public void setNetworkTopology(NetworkTopology topology) { this.clusterMap = topology; } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java index e63b09e528..3bd5f55945 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java @@ -137,10 +137,6 @@ public void chooseNodeWithNoExcludedNodes() throws SCMException { datanodeDetails.get(2))); Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1), datanodeDetails.get(2))); - Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0), - datanodeDetails.get(3))); - Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(2), - datanodeDetails.get(3))); } @Test @@ -188,7 +184,7 @@ public void testFallback() throws SCMException { // 5 replicas. there are only 3 racks. policy with fallback should // allocate the 5th datanode though it will break the rack rule(first - // 2 replicas on same rack, others are different racks). + // 2 replicas on same rack, others on different racks). int nodeNum = 5; List datanodeDetails = policy.chooseDatanodes(null, null, nodeNum, 15); @@ -199,10 +195,6 @@ public void testFallback() throws SCMException { datanodeDetails.get(2))); Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(1), datanodeDetails.get(2))); - Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(0), - datanodeDetails.get(3))); - Assert.assertFalse(cluster.isSameParent(datanodeDetails.get(2), - datanodeDetails.get(3))); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index 4657fa03fe..d028851168 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -1152,10 +1152,10 @@ private void testScmRegisterNodeWithNetworkTopology(boolean useHostname) // test get node if (useHostname) { Arrays.stream(hostNames).forEach(hostname -> - Assert.assertNotNull(nodeManager.getNode(hostname))); + Assert.assertNotNull(nodeManager.getNodeByAddress(hostname))); } else { Arrays.stream(ipAddress).forEach(ip -> - Assert.assertNotNull(nodeManager.getNode(ip))); + Assert.assertNotNull(nodeManager.getNodeByAddress(ip))); } } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java index ba78f4c9df..25b5b9bfc7 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java @@ -35,6 +35,7 @@ import java.io.File; import java.util.ArrayList; import java.util.List; +import java.util.UUID; /** * Test class for @{@link SCMBlockProtocolServer}. @@ -105,8 +106,8 @@ public void testSortDatanodes() throws Exception { node -> System.out.println(node.toString())); Assert.assertTrue(datanodeDetails.size() == nodeCount); - // illegal nodes to sort 1 - nodes.add("/default-rack"); + // unknown node to sort + nodes.add(UUID.randomUUID().toString()); ScmBlockLocationProtocolProtos.SortDatanodesRequestProto request = ScmBlockLocationProtocolProtos.SortDatanodesRequestProto .newBuilder() @@ -120,25 +121,11 @@ public void testSortDatanodes() throws Exception { resp.getNodeList().stream().forEach( node -> System.out.println(node.getNetworkName())); - // illegal nodes to sort 2 - nodes.remove("/default-rack"); - nodes.add(nodes.get(0) + "X"); - request = ScmBlockLocationProtocolProtos.SortDatanodesRequestProto - .newBuilder() - .addAllNodeNetworkName(nodes) - .setClient(client) - .build(); - resp = service.sortDatanodes(request); - Assert.assertTrue(resp.getNodeList().size() == nodeCount); - System.out.println("client = " + client); - resp.getNodeList().stream().forEach( - node -> System.out.println(node.getNetworkName())); - - // all illegal nodes + // all unknown nodes nodes.clear(); - nodes.add("/default-rack"); - nodes.add("/default-rack-1"); - nodes.add("/default-rack-2"); + nodes.add(UUID.randomUUID().toString()); + nodes.add(UUID.randomUUID().toString()); + nodes.add(UUID.randomUUID().toString()); request = ScmBlockLocationProtocolProtos.SortDatanodesRequestProto .newBuilder() .addAllNodeNetworkName(nodes) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index bc26e3c397..aa5dc2fd46 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -311,7 +311,12 @@ public List getCommandQueue(UUID dnID) { } @Override - public DatanodeDetails getNode(String address) { + public DatanodeDetails getNodeByUuid(String address) { + return null; + } + + @Override + public DatanodeDetails getNodeByAddress(String address) { return null; } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java index 3eb1ebd077..3ac59939e6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.ozone; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic + .NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; import static org.junit.Assert.fail; @@ -41,7 +43,9 @@ import org.apache.commons.lang3.RandomUtils; import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType; @@ -58,11 +62,14 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.DatanodeInfo; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.StaticMapping; import org.apache.hadoop.ozone.container.ContainerTestHelper; import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; @@ -72,6 +79,7 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.utils.HddsVersionInfo; import org.junit.Assert; import org.junit.Rule; @@ -483,6 +491,53 @@ public void testScmInfo() throws Exception { Assert.assertEquals(expectedVersion, actualVersion); } + /** + * Test datanode heartbeat well processed with a 4-layer network topology. + */ + @Test(timeout = 60000) + public void testScmProcessDatanodeHeartbeat() throws Exception { + OzoneConfiguration conf = new OzoneConfiguration(); + String scmId = UUID.randomUUID().toString(); + conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + StaticMapping.class, DNSToSwitchMapping.class); + StaticMapping.addNodeToRack(HddsUtils.getHostName(conf), "/rack1"); + + final int datanodeNum = 3; + MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf) + .setNumDatanodes(datanodeNum) + .setScmId(scmId) + .build(); + cluster.waitForClusterToBeReady(); + StorageContainerManager scm = cluster.getStorageContainerManager(); + + try { + // first sleep 10s + Thread.sleep(10000); + // verify datanode heartbeats are well processed + long heartbeatCheckerIntervalMs = + MiniOzoneCluster.Builder.DEFAULT_HB_INTERVAL_MS; + long start = Time.monotonicNow(); + Thread.sleep(heartbeatCheckerIntervalMs * 2); + + List allNodes = scm.getScmNodeManager().getAllNodes(); + Assert.assertTrue(allNodes.size() == datanodeNum); + for (int i = 0; i < allNodes.size(); i++) { + DatanodeInfo datanodeInfo = (DatanodeInfo) scm.getScmNodeManager() + .getNodeByUuid(allNodes.get(i).getUuidString()); + Assert.assertTrue((datanodeInfo.getLastHeartbeatTime() - start) + >= heartbeatCheckerIntervalMs); + Assert.assertTrue(datanodeInfo.getUuidString() + .equals(datanodeInfo.getNetworkName())); + Assert.assertTrue(datanodeInfo.getNetworkLocation() + .equals("/rack1")); + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + @Test @SuppressWarnings("unchecked") public void testCloseContainerCommandOnRestart() throws Exception { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java index b555966ba0..83eb78f3b1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java @@ -138,7 +138,10 @@ public static void setUp() throws Exception { NodeSchemaManager schemaManager = NodeSchemaManager.getInstance(); schemaManager.init(schemas, false); NetworkTopology clusterMap = new NetworkTopologyImpl(schemaManager); - nodeManager.getAllNodes().stream().forEach(node -> clusterMap.add(node)); + nodeManager.getAllNodes().stream().forEach(node -> { + node.setNetworkName(node.getUuidString()); + clusterMap.add(node); + }); ((MockNodeManager)nodeManager).setNetworkTopology(clusterMap); SCMConfigurator configurator = new SCMConfigurator(); configurator.setScmNodeManager(nodeManager); @@ -696,17 +699,17 @@ public void testLookupKeyWithLocation() throws IOException { Assert.assertNotEquals(follower1, follower2); // lookup key, leader as client - OmKeyInfo key1 = keyManager.lookupKey(keyArgs, leader.getNetworkName()); + OmKeyInfo key1 = keyManager.lookupKey(keyArgs, leader.getIpAddress()); Assert.assertEquals(leader, key1.getLatestVersionLocations() .getLocationList().get(0).getPipeline().getClosestNode()); // lookup key, follower1 as client - OmKeyInfo key2 = keyManager.lookupKey(keyArgs, follower1.getNetworkName()); + OmKeyInfo key2 = keyManager.lookupKey(keyArgs, follower1.getIpAddress()); Assert.assertEquals(follower1, key2.getLatestVersionLocations() .getLocationList().get(0).getPipeline().getClosestNode()); // lookup key, follower2 as client - OmKeyInfo key3 = keyManager.lookupKey(keyArgs, follower2.getNetworkName()); + OmKeyInfo key3 = keyManager.lookupKey(keyArgs, follower2.getIpAddress()); Assert.assertEquals(follower2, key3.getLatestVersionLocations() .getLocationList().get(0).getPipeline().getClosestNode()); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 7db4a80ebf..9b4eac3d6b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -2123,9 +2123,14 @@ private void sortDatanodeInPipeline(OmKeyInfo keyInfo, String clientMachine) { for (OmKeyLocationInfoGroup key : keyInfo.getKeyLocationVersions()) { key.getLocationList().forEach(k -> { List nodes = k.getPipeline().getNodes(); + if (nodes == null || nodes.size() == 0) { + LOG.warn("Datanodes for pipeline {} is empty", + k.getPipeline().getId().toString()); + return; + } List nodeList = new ArrayList<>(); nodes.stream().forEach(node -> - nodeList.add(node.getNetworkName())); + nodeList.add(node.getUuidString())); try { List sortedNodes = scmClient.getBlockClient() .sortDatanodes(nodeList, clientMachine);