From 632aca5793d391c741c0bce3d2e70ae6e03fe306 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Wed, 11 Jul 2018 12:08:50 -0700 Subject: [PATCH] HDDS-242. Introduce NEW_NODE, STALE_NODE and DEAD_NODE event and corresponding event handlers in SCM. Contributed by Nanda Kumar. Recommitting after making sure that patch is clean. --- .../container/CloseContainerEventHandler.java | 7 ++- .../hdds/scm/container/ContainerMapping.java | 5 -- .../scm/container/ContainerReportHandler.java | 47 +++++++++++++++++ .../hadoop/hdds/scm/container/Mapping.java | 9 +--- .../scm/container/closer/ContainerCloser.java | 1 - .../hadoop/hdds/scm/events/SCMEvents.java | 22 ++++++++ .../hadoop/hdds/scm/node/DatanodeInfo.java | 11 ++++ .../hadoop/hdds/scm/node/DeadNodeHandler.java | 42 ++++++++++++++++ .../hadoop/hdds/scm/node/NewNodeHandler.java | 50 +++++++++++++++++++ .../hadoop/hdds/scm/node/NodeManager.java | 4 +- .../hdds/scm/node/NodeReportHandler.java | 42 ++++++++++++++++ .../hdds/scm/node/NodeStateManager.java | 32 +++++++++++- .../hadoop/hdds/scm/node/SCMNodeManager.java | 24 ++++++--- .../hdds/scm/node/StaleNodeHandler.java | 42 ++++++++++++++++ .../SCMDatanodeHeartbeatDispatcher.java | 20 ++++++-- .../scm/server/SCMDatanodeProtocolServer.java | 18 ++----- .../scm/server/StorageContainerManager.java | 47 +++++++++++++---- .../hdds/scm/container/MockNodeManager.java | 9 ++++ .../TestCloseContainerEventHandler.java | 2 + .../hdds/scm/node/TestContainerPlacement.java | 12 ++++- .../hadoop/hdds/scm/node/TestNodeManager.java | 11 +++- .../TestSCMDatanodeHeartbeatDispatcher.java | 8 ++- .../testutils/ReplicationNodeManagerMock.java | 7 +++ 23 files changed, 415 insertions(+), 57 deletions(-) create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java index f1053d5343..859e5d54a3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java @@ -25,9 +25,12 @@ import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND; + /** * In case of a node failure, volume failure, volume out of spapce, node * out of space etc, CLOSE_CONTAINER will be triggered. @@ -73,9 +76,11 @@ public void onMessage(ContainerID containerID, EventPublisher publisher) { if (info.getState() == HddsProtos.LifeCycleState.OPEN) { for (DatanodeDetails datanode : containerWithPipeline.getPipeline().getMachines()) { - containerManager.getNodeManager().addDatanodeCommand(datanode.getUuid(), + CommandForDatanode closeContainerCommand = new CommandForDatanode<>( + datanode.getUuid(), new CloseContainerCommand(containerID.getId(), info.getReplicationType())); + publisher.fireEvent(DATANODE_COMMAND, closeContainerCommand); } try { // Finalize event will make sure the state of the container transitions diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index e25c5b4740..abad32c6be 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -709,11 +709,6 @@ public void flushContainerInfo() throws IOException { } } - @Override - public NodeManager getNodeManager() { - return nodeManager; - } - @VisibleForTesting public MetadataStore getContainerStore() { return containerStore; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java new file mode 100644 index 0000000000..486162e3d3 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .ContainerReportFromDatanode; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; + +/** + * Handles container reports from datanode. + */ +public class ContainerReportHandler implements + EventHandler { + + private final Mapping containerMapping; + private final Node2ContainerMap node2ContainerMap; + + public ContainerReportHandler(Mapping containerMapping, + Node2ContainerMap node2ContainerMap) { + this.containerMapping = containerMapping; + this.node2ContainerMap = node2ContainerMap; + } + + @Override + public void onMessage(ContainerReportFromDatanode containerReportFromDatanode, + EventPublisher publisher) { + // TODO: process container report. + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java index f52eb05ce0..ac84be44a3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.scm.node.NodeManager; import java.io.Closeable; import java.io.IOException; @@ -129,17 +128,11 @@ void processContainerReports(DatanodeDetails datanodeDetails, void updateDeleteTransactionId(Map deleteTransactionMap) throws IOException; - /** - * Returns the nodeManager. - * @return NodeManager - */ - NodeManager getNodeManager(); - /** * Returns the ContainerWithPipeline. * @return NodeManager */ - public ContainerWithPipeline getMatchingContainerWithPipeline(final long size, + ContainerWithPipeline getMatchingContainerWithPipeline(long size, String owner, ReplicationType type, ReplicationFactor factor, LifeCycleState state) throws IOException; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java index 3ca8ba91e6..eb591be487 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/closer/ContainerCloser.java @@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.util.Time; import org.slf4j.Logger; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 2c9c4310b9..0afd675581 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.events; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode; @@ -71,6 +72,27 @@ public final class SCMEvents { public static final TypedEvent CLOSE_CONTAINER = new TypedEvent<>(ContainerID.class, "Close_Container"); + /** + * This event will be triggered whenever a new datanode is + * registered with SCM. + */ + public static final TypedEvent NEW_NODE = + new TypedEvent<>(DatanodeDetails.class, "New_Node"); + + /** + * This event will be triggered whenever a datanode is moved from healthy to + * stale state. + */ + public static final TypedEvent STALE_NODE = + new TypedEvent<>(DatanodeDetails.class, "Stale_Node"); + + /** + * This event will be triggered whenever a datanode is moved from stale to + * dead state. + */ + public static final TypedEvent DEAD_NODE = + new TypedEvent<>(DatanodeDetails.class, "Dead_Node"); + /** * Private Ctor. Never Constructed. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java index 51465ee95d..6d5575b9ab 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java @@ -106,4 +106,15 @@ public List getStorageReports() { lock.readLock().unlock(); } } + + @Override + public int hashCode() { + return super.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } + } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java new file mode 100644 index 0000000000..427aef88ac --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; + +/** + * Handles Dead Node event. + */ +public class DeadNodeHandler implements EventHandler { + + private final Node2ContainerMap node2ContainerMap; + + public DeadNodeHandler(Node2ContainerMap node2ContainerMap) { + this.node2ContainerMap = node2ContainerMap; + } + + @Override + public void onMessage(DatanodeDetails datanodeDetails, + EventPublisher publisher) { + //TODO: add logic to handle dead node. + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java new file mode 100644 index 0000000000..79b75a5af0 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; + +import java.util.Collections; + +/** + * Handles New Node event. + */ +public class NewNodeHandler implements EventHandler { + + private final Node2ContainerMap node2ContainerMap; + + public NewNodeHandler(Node2ContainerMap node2ContainerMap) { + this.node2ContainerMap = node2ContainerMap; + } + + @Override + public void onMessage(DatanodeDetails datanodeDetails, + EventPublisher publisher) { + try { + node2ContainerMap.insertNewDatanode(datanodeDetails.getUuid(), + Collections.emptySet()); + } catch (SCMException e) { + // TODO: log exception message. + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index c13c37c136..5e2969d3cf 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -22,7 +22,9 @@ import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import java.io.Closeable; @@ -53,7 +55,7 @@ * list, by calling removeNode. We will throw away this nodes info soon. */ public interface NodeManager extends StorageContainerNodeProtocol, - NodeManagerMXBean, Closeable { + EventHandler, NodeManagerMXBean, Closeable { /** * Removes a data node from the management of this Node Manager. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java new file mode 100644 index 0000000000..aa78d53cfd --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeReportHandler.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .NodeReportFromDatanode; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; + +/** + * Handles Node Reports from datanode. + */ +public class NodeReportHandler implements EventHandler { + + private final NodeManager nodeManager; + + public NodeReportHandler(NodeManager nodeManager) { + this.nodeManager = nodeManager; + } + + @Override + public void onMessage(NodeReportFromDatanode nodeReportFromDatanode, + EventPublisher publisher) { + //TODO: process node report. + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java index 5543c04039..77f939eab1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java @@ -24,9 +24,12 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.HddsServerUtil; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.node.states.NodeStateMap; +import org.apache.hadoop.hdds.server.events.Event; +import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.common.statemachine .InvalidStateTransitionException; import org.apache.hadoop.ozone.common.statemachine.StateMachine; @@ -36,9 +39,11 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; @@ -86,6 +91,14 @@ private enum NodeLifeCycleEvent { * This is the map which maintains the current state of all datanodes. */ private final NodeStateMap nodeStateMap; + /** + * Used for publishing node state change events. + */ + private final EventPublisher eventPublisher; + /** + * Maps the event to be triggered when a node state us updated. + */ + private final Map> state2EventMap; /** * ExecutorService used for scheduling heartbeat processing thread. */ @@ -108,8 +121,11 @@ private enum NodeLifeCycleEvent { * * @param conf Configuration */ - public NodeStateManager(Configuration conf) { - nodeStateMap = new NodeStateMap(); + public NodeStateManager(Configuration conf, EventPublisher eventPublisher) { + this.nodeStateMap = new NodeStateMap(); + this.eventPublisher = eventPublisher; + this.state2EventMap = new HashMap<>(); + initialiseState2EventMap(); Set finalStates = new HashSet<>(); finalStates.add(NodeState.DECOMMISSIONED); this.stateMachine = new StateMachine<>(NodeState.HEALTHY, finalStates); @@ -130,6 +146,14 @@ public NodeStateManager(Configuration conf) { TimeUnit.MILLISECONDS); } + /** + * Populates state2event map. + */ + private void initialiseState2EventMap() { + state2EventMap.put(NodeState.STALE, SCMEvents.STALE_NODE); + state2EventMap.put(NodeState.DEAD, SCMEvents.DEAD_NODE); + } + /* * * Node and State Transition Mapping: @@ -220,6 +244,7 @@ private void initializeStateMachine() { public void addNode(DatanodeDetails datanodeDetails) throws NodeAlreadyExistsException { nodeStateMap.addNode(datanodeDetails, stateMachine.getInitialState()); + eventPublisher.fireEvent(SCMEvents.NEW_NODE, datanodeDetails); } /** @@ -548,6 +573,9 @@ private void updateNodeState(DatanodeInfo node, Predicate condition, if (condition.test(node.getLastHeartbeatTime())) { NodeState newState = stateMachine.getNextState(state, lifeCycleEvent); nodeStateMap.updateNodeState(node.getUuid(), state, newState); + if (state2EventMap.containsKey(newState)) { + eventPublisher.fireEvent(state2EventMap.get(newState), node); + } } } catch (InvalidStateTransitionException e) { LOG.warn("Invalid state transition of node {}." + diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index d787d14487..2ba8067a05 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.scm.VersionInfo; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; -import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -78,8 +77,7 @@ * as soon as you read it. */ public class SCMNodeManager - implements NodeManager, StorageContainerNodeProtocol, - EventHandler { + implements NodeManager, StorageContainerNodeProtocol { @VisibleForTesting static final Logger LOG = @@ -117,14 +115,13 @@ public class SCMNodeManager // Node pool manager. private final StorageContainerManager scmManager; - - /** * Constructs SCM machine Manager. */ public SCMNodeManager(OzoneConfiguration conf, String clusterID, - StorageContainerManager scmManager) throws IOException { - this.nodeStateManager = new NodeStateManager(conf); + StorageContainerManager scmManager, EventPublisher eventPublisher) + throws IOException { + this.nodeStateManager = new NodeStateManager(conf, eventPublisher); this.nodeStats = new ConcurrentHashMap<>(); this.scmStat = new SCMNodeStat(); this.clusterID = clusterID; @@ -462,14 +459,25 @@ public Map getNodeCount() { return nodeCountMap; } + // TODO: + // Since datanode commands are added through event queue, onMessage method + // should take care of adding commands to command queue. + // Refactor and remove all the usage of this method and delete this method. @Override public void addDatanodeCommand(UUID dnId, SCMCommand command) { this.commandQueue.addCommand(dnId, command); } + /** + * This method is called by EventQueue whenever someone adds a new + * DATANODE_COMMAND to the Queue. + * + * @param commandForDatanode DatanodeCommand + * @param ignored publisher + */ @Override public void onMessage(CommandForDatanode commandForDatanode, - EventPublisher publisher) { + EventPublisher ignored) { addDatanodeCommand(commandForDatanode.getDatanodeId(), commandForDatanode.getCommand()); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java new file mode 100644 index 0000000000..b37dd93978 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; + +/** + * Handles Stale node event. + */ +public class StaleNodeHandler implements EventHandler { + + private final Node2ContainerMap node2ContainerMap; + + public StaleNodeHandler(Node2ContainerMap node2ContainerMap) { + this.node2ContainerMap = node2ContainerMap; + } + + @Override + public void onMessage(DatanodeDetails datanodeDetails, + EventPublisher publisher) { + //TODO: logic to handle stale node. + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java index a6354affaa..4cfa98fbd8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hdds.scm.server; +import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; @@ -24,12 +25,16 @@ .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.server.events.EventPublisher; import com.google.protobuf.GeneratedMessage; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT; import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT; @@ -42,10 +47,15 @@ public final class SCMDatanodeHeartbeatDispatcher { private static final Logger LOG = LoggerFactory.getLogger(SCMDatanodeHeartbeatDispatcher.class); - private EventPublisher eventPublisher; + private final NodeManager nodeManager; + private final EventPublisher eventPublisher; - public SCMDatanodeHeartbeatDispatcher(EventPublisher eventPublisher) { + public SCMDatanodeHeartbeatDispatcher(NodeManager nodeManager, + EventPublisher eventPublisher) { + Preconditions.checkNotNull(nodeManager); + Preconditions.checkNotNull(eventPublisher); + this.nodeManager = nodeManager; this.eventPublisher = eventPublisher; } @@ -54,11 +64,14 @@ public SCMDatanodeHeartbeatDispatcher(EventPublisher eventPublisher) { * Dispatches heartbeat to registered event handlers. * * @param heartbeat heartbeat to be dispatched. + * + * @return list of SCMCommand */ - public void dispatch(SCMHeartbeatRequestProto heartbeat) { + public List dispatch(SCMHeartbeatRequestProto heartbeat) { DatanodeDetails datanodeDetails = DatanodeDetails.getFromProtoBuf(heartbeat.getDatanodeDetails()); // should we dispatch heartbeat through eventPublisher? + List commands = nodeManager.processHeartbeat(datanodeDetails); if (heartbeat.hasNodeReport()) { LOG.debug("Dispatching Node Report."); eventPublisher.fireEvent(NODE_REPORT, @@ -73,6 +86,7 @@ public void dispatch(SCMHeartbeatRequestProto heartbeat) { heartbeat.getContainerReport())); } + return commands; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index aef5b03ce2..aee64b9c54 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -133,7 +133,8 @@ public SCMDatanodeProtocolServer(final OzoneConfiguration conf, conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY, OZONE_SCM_HANDLER_COUNT_DEFAULT); - heartbeatDispatcher = new SCMDatanodeHeartbeatDispatcher(eventPublisher); + heartbeatDispatcher = new SCMDatanodeHeartbeatDispatcher( + scm.getScmNodeManager(), eventPublisher); RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class, ProtobufRpcEngine.class); @@ -214,22 +215,13 @@ public static SCMRegisteredResponseProto getRegisteredResponse( @Override public SCMHeartbeatResponseProto sendHeartbeat( - SCMHeartbeatRequestProto heartbeat) - throws IOException { - heartbeatDispatcher.dispatch(heartbeat); - - // TODO: Remove the below code after SCM refactoring. - DatanodeDetails datanodeDetails = DatanodeDetails - .getFromProtoBuf(heartbeat.getDatanodeDetails()); - NodeReportProto nodeReport = heartbeat.getNodeReport(); - List commands = - scm.getScmNodeManager().processHeartbeat(datanodeDetails); + SCMHeartbeatRequestProto heartbeat) throws IOException { List cmdResponses = new LinkedList<>(); - for (SCMCommand cmd : commands) { + for (SCMCommand cmd : heartbeatDispatcher.dispatch(heartbeat)) { cmdResponses.add(getCommandResponse(cmd)); } return SCMHeartbeatResponseProto.newBuilder() - .setDatanodeUUID(datanodeDetails.getUuidString()) + .setDatanodeUUID(heartbeat.getDatanodeDetails().getUuid()) .addAllCommands(cmdResponses).build(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 49d3a40542..5f511eee31 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -33,15 +33,23 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; +import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; import org.apache.hadoop.hdds.scm.container.ContainerMapping; +import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; import org.apache.hadoop.hdds.scm.container.Mapping; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; +import org.apache.hadoop.hdds.scm.node.DeadNodeHandler; +import org.apache.hadoop.hdds.scm.node.NewNodeHandler; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeReportHandler; import org.apache.hadoop.hdds.scm.node.SCMNodeManager; +import org.apache.hadoop.hdds.scm.node.StaleNodeHandler; +import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.hdfs.DFSUtil; @@ -71,7 +79,6 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB; -import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED; import static org.apache.hadoop.util.ExitUtil.terminate; @@ -126,6 +133,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl private final Mapping scmContainerManager; private final BlockManager scmBlockManager; private final SCMStorage scmStorage; + + private final EventQueue eventQueue; /* * HTTP endpoint for JMX access. */ @@ -164,18 +173,35 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { throw new SCMException("SCM not initialized.", ResultCodes .SCM_NOT_INITIALIZED); } - EventQueue eventQueue = new EventQueue(); - SCMNodeManager nm = - new SCMNodeManager(conf, scmStorage.getClusterID(), this); - scmNodeManager = nm; - eventQueue.addHandler(DATANODE_COMMAND, nm); + eventQueue = new EventQueue(); - scmContainerManager = new ContainerMapping(conf, getScmNodeManager(), - cacheSize); + scmNodeManager = new SCMNodeManager( + conf, scmStorage.getClusterID(), this, eventQueue); + scmContainerManager = new ContainerMapping( + conf, getScmNodeManager(), cacheSize); + scmBlockManager = new BlockManagerImpl( + conf, getScmNodeManager(), scmContainerManager); - scmBlockManager = - new BlockManagerImpl(conf, getScmNodeManager(), scmContainerManager); + Node2ContainerMap node2ContainerMap = new Node2ContainerMap(); + + CloseContainerEventHandler closeContainerHandler = + new CloseContainerEventHandler(scmContainerManager); + NodeReportHandler nodeReportHandler = + new NodeReportHandler(scmNodeManager); + ContainerReportHandler containerReportHandler = + new ContainerReportHandler(scmContainerManager, node2ContainerMap); + NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap); + StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap); + DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap); + + eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); + eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler); + eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler); + eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler); + eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler); + eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler); + eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler); scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys .OZONE_ADMINISTRATORS); @@ -189,7 +215,6 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { blockProtocolServer = new SCMBlockProtocolServer(conf, this); clientProtocolServer = new SCMClientProtocolServer(conf, this); httpServer = new StorageContainerManagerHttpServer(conf); - registerMXBean(); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 3357992879..5e83c288ed 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -26,8 +26,10 @@ .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.protocol.VersionResponse; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.assertj.core.util.Preconditions; @@ -399,6 +401,13 @@ public void delContainer(DatanodeDetails datanodeDetails, long size) { } } + @Override + public void onMessage(CommandForDatanode commandForDatanode, + EventPublisher publisher) { + addDatanodeCommand(commandForDatanode.getDatanodeId(), + commandForDatanode.getCommand()); + } + /** * A class to declare some values for the nodes so that our tests * won't fail. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java index 0d46ffae56..0764b12743 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java @@ -41,6 +41,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB; import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER; +import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND; /** * Tests the closeContainerEventHandler class. @@ -69,6 +70,7 @@ public static void setUp() throws Exception { eventQueue = new EventQueue(); eventQueue.addHandler(CLOSE_CONTAINER, new CloseContainerEventHandler(mapping)); + eventQueue.addHandler(DATANODE_COMMAND, nodeManager); } @AfterClass diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java index c6ea2af121..48567ee2bf 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java @@ -34,6 +34,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.test.PathUtils; @@ -41,6 +43,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.mockito.Mockito; import java.io.File; import java.io.IOException; @@ -86,8 +89,15 @@ OzoneConfiguration getConf() { SCMNodeManager createNodeManager(OzoneConfiguration config) throws IOException { + EventQueue eventQueue = new EventQueue(); + eventQueue.addHandler(SCMEvents.NEW_NODE, + Mockito.mock(NewNodeHandler.class)); + eventQueue.addHandler(SCMEvents.STALE_NODE, + Mockito.mock(StaleNodeHandler.class)); + eventQueue.addHandler(SCMEvents.DEAD_NODE, + Mockito.mock(DeadNodeHandler.class)); SCMNodeManager nodeManager = new SCMNodeManager(config, - UUID.randomUUID().toString(), null); + UUID.randomUUID().toString(), null, eventQueue); assertFalse("Node manager should be in chill mode", nodeManager.isOutOfChillMode()); return nodeManager; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java index d72309e979..cefd179a8f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeManager.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.StorageReportProto; +import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; @@ -45,6 +46,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.mockito.Mockito; import java.io.File; import java.io.IOException; @@ -124,8 +126,15 @@ OzoneConfiguration getConf() { SCMNodeManager createNodeManager(OzoneConfiguration config) throws IOException { + EventQueue eventQueue = new EventQueue(); + eventQueue.addHandler(SCMEvents.NEW_NODE, + Mockito.mock(NewNodeHandler.class)); + eventQueue.addHandler(SCMEvents.STALE_NODE, + Mockito.mock(StaleNodeHandler.class)); + eventQueue.addHandler(SCMEvents.DEAD_NODE, + Mockito.mock(DeadNodeHandler.class)); SCMNodeManager nodeManager = new SCMNodeManager(config, - UUID.randomUUID().toString(), null); + UUID.randomUUID().toString(), null, eventQueue); assertFalse("Node manager should be in chill mode", nodeManager.isOutOfChillMode()); return nodeManager; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java index a77ed0452d..042e3ccfe1 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .ContainerReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher @@ -37,6 +38,7 @@ import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT; import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT; @@ -55,7 +57,8 @@ public void testNodeReportDispatcher() throws IOException { NodeReportProto nodeReport = NodeReportProto.getDefaultInstance(); SCMDatanodeHeartbeatDispatcher dispatcher = - new SCMDatanodeHeartbeatDispatcher(new EventPublisher() { + new SCMDatanodeHeartbeatDispatcher(Mockito.mock(NodeManager.class), + new EventPublisher() { @Override public > void fireEvent( EVENT_TYPE event, PAYLOAD payload) { @@ -90,7 +93,8 @@ public void testContainerReportDispatcher() throws IOException { ContainerReportsProto.getDefaultInstance(); SCMDatanodeHeartbeatDispatcher dispatcher = - new SCMDatanodeHeartbeatDispatcher(new EventPublisher() { + new SCMDatanodeHeartbeatDispatcher(Mockito.mock(NodeManager.class), + new EventPublisher() { @Override public > void fireEvent( EVENT_TYPE event, PAYLOAD payload) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index e15e0fcfc4..2d27d7143e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -28,7 +28,9 @@ .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto; +import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.ozone.protocol.VersionResponse; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; @@ -287,4 +289,9 @@ public void addDatanodeCommand(UUID dnId, SCMCommand command) { this.commandQueue.addCommand(dnId, command); } + @Override + public void onMessage(CommandForDatanode commandForDatanode, + EventPublisher publisher) { + // do nothing. + } }