From ae5242accbbd47e44adada58958ce7216ff092b3 Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Sat, 20 May 2017 10:28:12 +0800 Subject: [PATCH] HDFS-11830. Ozone: Datanode needs to re-register to SCM if SCM is restarted. Contributed by Weiwei Yang. --- .../endpoint/HeartbeatEndpointTask.java | 26 ++++++-- .../protocol/commands/ReregisterCommand.java | 59 +++++++++++++++++++ .../ozone/scm/StorageContainerManager.java | 7 +++ .../hadoop/ozone/scm/node/CommandQueue.java | 11 +--- .../hadoop/ozone/scm/node/SCMNodeManager.java | 5 +- .../StorageContainerDatanodeProtocol.proto | 9 ++- .../ozone/scm/node/TestNodeManager.java | 47 +++++++++++++++ 7 files changed, 149 insertions(+), 15 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index c0d301ebbb..764e6d2a46 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -23,12 +23,12 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine; +import org.apache.hadoop.ozone.container.common.statemachine + .EndpointStateMachine.EndPointStates; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto; import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand; -import org.apache.hadoop.ozone.protocol.proto - .StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto; import org.apache.hadoop.ozone.protocol.proto @@ -125,10 +125,28 @@ public static Builder newBuilder() { private void processResponse(SCMHeartbeatResponseProto response) { for (SCMCommandResponseProto commandResponseProto : response .getCommandsList()) { - if (commandResponseProto.getCmdType() == - StorageContainerDatanodeProtocolProtos.Type.sendContainerReport) { + switch (commandResponseProto.getCmdType()) { + case sendContainerReport: this.context.addCommand(SendContainerCommand.getFromProtobuf( commandResponseProto.getSendReport())); + break; + case reregisterCommand: + if (rpcEndpoint.getState() == EndPointStates.HEARTBEAT) { + if (LOG.isDebugEnabled()) { + LOG.debug("Received SCM notification to register." + + " Interrupt HEARTBEAT and transit to REGISTER state."); + } + rpcEndpoint.setState(EndPointStates.REGISTER); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Illegal state {} found, expecting {}.", + rpcEndpoint.getState().name(), EndPointStates.HEARTBEAT); + } + } + break; + default: + throw new IllegalArgumentException("Unknown response : " + + commandResponseProto.getCmdType().name()); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java new file mode 100644 index 0000000000..f87d35b5ee --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.protocol.commands; + +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.Type; + +import static org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto; +import static org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.Type.reregisterCommand; + +/** + * Informs a datanode to register itself with SCM again. + */ +public class ReregisterCommand extends + SCMCommand{ + + /** + * Returns the type of this command. + * + * @return Type + */ + @Override + public Type getType() { + return reregisterCommand; + } + + /** + * Gets the protobuf message of this object. + * + * @return A protobuf message. + */ + @Override + public byte[] getProtoBufMessage() { + return getProto().toByteArray(); + } + + public SCMReregisterCmdResponseProto getProto() { + return SCMReregisterCmdResponseProto + .newBuilder() + .build(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java index 2dfe53d05a..c12b556545 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java @@ -68,6 +68,8 @@ .StorageContainerDatanodeProtocolProtos.Type; import org.apache.hadoop.ozone.protocol.proto .StorageContainerLocationProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto; import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB; import org.apache.hadoop.ozone.protocolPB .StorageContainerDatanodeProtocolServerSideTranslatorPB; @@ -325,6 +327,11 @@ public static SCMCommandResponseProto getCommandResponse(SCMCommand cmd) return builder.setCmdType(Type.sendContainerReport) .setSendReport(SendContainerReportProto.getDefaultInstance()) .build(); + case reregisterCommand: + return builder.setCmdType(Type.reregisterCommand) + .setReregisterProto(SCMReregisterCmdResponseProto + .getDefaultInstance()) + .build(); default: throw new IllegalArgumentException("Not implemented"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java index 2c6cae8416..59326064ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java @@ -60,19 +60,12 @@ public CommandQueue() { @SuppressWarnings("unchecked") List getCommand(final DatanodeID datanodeID) { lock.lock(); - try { - if (commandMap.containsKey(datanodeID)) { - List temp = commandMap.get(datanodeID); - if (temp.size() > 0) { - commandMap.put(datanodeID, DEFAULT_LIST); - return temp; - } - } + List cmds = commandMap.remove(datanodeID); + return cmds == null ? DEFAULT_LIST : cmds; } finally { lock.unlock(); } - return DEFAULT_LIST; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java index 9d53e01e86..3ab72750ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java @@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; import org.apache.hadoop.ozone.protocol.VersionResponse; +import org.apache.hadoop.ozone.protocol.commands.ReregisterCommand; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.proto @@ -585,7 +586,10 @@ private void handleHeartbeat(HeartbeatQueueItem hbItem) { updateNodeStat(datanodeID, nodeReport); return; } + LOG.warn("SCM receive heartbeat from unregistered datanode {}", datanodeID); + this.commandQueue.addCommand(hbItem.getDatanodeID(), + new ReregisterCommand()); } private void updateNodeStat(String datanodeID, SCMNodeReport nodeReport) { @@ -704,7 +708,6 @@ public SCMCommand register(DatanodeID datanodeID) { * @return SCMCommand */ private SCMCommand verifyDatanodeUUID(DatanodeID datanodeID) { - if (datanodeID.getDatanodeUuid() != null && nodes.containsKey(datanodeID.getDatanodeUuid())) { LOG.trace("Datanode is already registered. Datanode: {}", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto index 38c6dd16f4..212d9b737b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto @@ -171,6 +171,12 @@ message SCMRegisteredCmdResponseProto { optional SCMNodeAddressList addressList = 5; } +/** + * SCM informs a datanode to register itself again. + * With recieving this command, datanode will transit to REGISTER state. + */ +message SCMReregisterCmdResponseProto {} + /** * Container ID maintains the container's Identity along with cluster ID * after the registration is done. @@ -195,6 +201,7 @@ enum Type { versionCommand = 2; registeredCommand = 3; sendContainerReport = 4; + reregisterCommand = 5; } /* @@ -205,7 +212,7 @@ message SCMCommandResponseProto { optional SCMRegisteredCmdResponseProto registeredProto = 3; optional SCMVersionResponseProto versionProto = 4; optional SendContainerReportProto sendReport = 5; - + optional SCMReregisterCmdResponseProto reregisterProto = 6; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java index 25ef7cc981..4ef17921c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java @@ -17,11 +17,13 @@ */ package org.apache.hadoop.ozone.scm.node; +import com.google.common.base.Supplier; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.ozone.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMNodeReport; import org.apache.hadoop.ozone.protocol.proto @@ -46,6 +48,8 @@ import java.util.UUID; import java.util.concurrent.TimeoutException; +import static org.apache.hadoop.ozone.protocol.proto + .StorageContainerDatanodeProtocolProtos.Type; import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.DEAD; import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY; import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE; @@ -241,6 +245,49 @@ public void testScmShutdown() throws IOException, InterruptedException, nodeManager.getLastHBProcessedCount()); } + /** + * Asserts scm informs datanodes to re-register with the nodemanager + * on a restart. + * + * @throws Exception + */ + @Test + public void testScmHeartbeatAfterRestart() throws Exception { + OzoneConfiguration conf = getConf(); + conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100); + DatanodeID datanodeID = SCMTestUtils.getDatanodeID(); + try (SCMNodeManager nodemanager = createNodeManager(conf)) { + nodemanager.register(datanodeID); + List command = nodemanager.sendHeartbeat(datanodeID, null); + Assert.assertTrue(nodemanager.getAllNodes().contains(datanodeID)); + Assert.assertTrue("On regular HB calls, SCM responses a " + + "datanode with an empty command list", command.isEmpty()); + } + + // Sends heartbeat without registering to SCM. + // This happens when SCM restarts. + try (SCMNodeManager nodemanager = createNodeManager(conf)) { + Assert.assertFalse(nodemanager + .getAllNodes().contains(datanodeID)); + try { + // SCM handles heartbeat asynchronously. + // It may need more than one heartbeat processing to + // send the notification. + GenericTestUtils.waitFor(new Supplier() { + @Override public Boolean get() { + List command = + nodemanager.sendHeartbeat(datanodeID, null); + return command.size() == 1 && command.get(0).getType() + .equals(Type.reregisterCommand); + } + }, 100, 3 * 1000); + } catch (TimeoutException e) { + Assert.fail("Times out to verify that scm informs " + + "datanode to re-register itself."); + } + } + } + /** * Asserts that we detect as many healthy nodes as we have generated heartbeat * for.