HDDS-234. Add SCM node report handler.
Contributed by Ajay Kumar.
This commit is contained in:
parent
5ee90efed3
commit
556d9b36be
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdds.scm.node;
|
package org.apache.hadoop.hdds.scm.node;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
|
||||||
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
|
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
|
||||||
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
|
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
|
||||||
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
|
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
|
||||||
@ -138,4 +139,12 @@ public interface NodeManager extends StorageContainerNodeProtocol,
|
|||||||
* @param command
|
* @param command
|
||||||
*/
|
*/
|
||||||
void addDatanodeCommand(UUID dnId, SCMCommand command);
|
void addDatanodeCommand(UUID dnId, SCMCommand command);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process node report.
|
||||||
|
*
|
||||||
|
* @param dnUuid
|
||||||
|
* @param nodeReport
|
||||||
|
*/
|
||||||
|
void processNodeReport(UUID dnUuid, NodeReportProto nodeReport);
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,7 @@
|
|||||||
* "License"); you may not use this file except in compliance
|
* "License"); you may not use this file except in compliance
|
||||||
* with the License. You may obtain a copy of the License at
|
* with the License. You may obtain a copy of the License at
|
||||||
*
|
*
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
*
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
@ -18,25 +18,38 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.hdds.scm.node;
|
package org.apache.hadoop.hdds.scm.node;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
|
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
|
||||||
.NodeReportFromDatanode;
|
.NodeReportFromDatanode;
|
||||||
import org.apache.hadoop.hdds.server.events.EventHandler;
|
import org.apache.hadoop.hdds.server.events.EventHandler;
|
||||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handles Node Reports from datanode.
|
* Handles Node Reports from datanode.
|
||||||
*/
|
*/
|
||||||
public class NodeReportHandler implements EventHandler<NodeReportFromDatanode> {
|
public class NodeReportHandler implements EventHandler<NodeReportFromDatanode> {
|
||||||
|
|
||||||
|
private static final Logger LOGGER = LoggerFactory
|
||||||
|
.getLogger(NodeReportHandler.class);
|
||||||
private final NodeManager nodeManager;
|
private final NodeManager nodeManager;
|
||||||
|
|
||||||
public NodeReportHandler(NodeManager nodeManager) {
|
public NodeReportHandler(NodeManager nodeManager) {
|
||||||
|
Preconditions.checkNotNull(nodeManager);
|
||||||
this.nodeManager = nodeManager;
|
this.nodeManager = nodeManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(NodeReportFromDatanode nodeReportFromDatanode,
|
public void onMessage(NodeReportFromDatanode nodeReportFromDatanode,
|
||||||
EventPublisher publisher) {
|
EventPublisher publisher) {
|
||||||
//TODO: process node report.
|
Preconditions.checkNotNull(nodeReportFromDatanode);
|
||||||
|
DatanodeDetails dn = nodeReportFromDatanode.getDatanodeDetails();
|
||||||
|
Preconditions.checkNotNull(dn, "NodeReport is "
|
||||||
|
+ "missing DatanodeDetails.");
|
||||||
|
LOGGER.trace("Processing node report for dn: {}", dn);
|
||||||
|
nodeManager
|
||||||
|
.processNodeReport(dn.getUuid(), nodeReportFromDatanode.getReport());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -422,6 +422,17 @@ public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) {
|
|||||||
return commandQueue.getCommand(datanodeDetails.getUuid());
|
return commandQueue.getCommand(datanodeDetails.getUuid());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process node report.
|
||||||
|
*
|
||||||
|
* @param dnUuid
|
||||||
|
* @param nodeReport
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void processNodeReport(UUID dnUuid, NodeReportProto nodeReport) {
|
||||||
|
this.updateNodeStat(dnUuid, nodeReport);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the aggregated node stats.
|
* Returns the aggregated node stats.
|
||||||
* @return the aggregated node stats.
|
* @return the aggregated node stats.
|
||||||
|
@ -295,6 +295,17 @@ public void addDatanodeCommand(UUID dnId, SCMCommand command) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Empty implementation for processNodeReport.
|
||||||
|
*
|
||||||
|
* @param dnUuid
|
||||||
|
* @param nodeReport
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void processNodeReport(UUID dnUuid, NodeReportProto nodeReport) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
// Returns the number of commands that is queued to this node manager.
|
// Returns the number of commands that is queued to this node manager.
|
||||||
public int getCommandCount(DatanodeDetails dd) {
|
public int getCommandCount(DatanodeDetails dd) {
|
||||||
List<SCMCommand> list = commandMap.get(dd.getUuid());
|
List<SCMCommand> list = commandMap.get(dd.getUuid());
|
||||||
|
@ -0,0 +1,95 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with this
|
||||||
|
* work for additional information regarding copyright ownership. The ASF
|
||||||
|
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdds.scm.node;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
|
||||||
|
import org.apache.hadoop.hdds.scm.TestUtils;
|
||||||
|
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
|
||||||
|
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
|
||||||
|
import org.apache.hadoop.hdds.server.events.Event;
|
||||||
|
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||||
|
import org.apache.hadoop.hdds.server.events.EventQueue;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class TestNodeReportHandler implements EventPublisher {
|
||||||
|
|
||||||
|
private static Logger LOG = LoggerFactory
|
||||||
|
.getLogger(TestNodeReportHandler.class);
|
||||||
|
private NodeReportHandler nodeReportHandler;
|
||||||
|
private SCMNodeManager nodeManager;
|
||||||
|
private String storagePath = GenericTestUtils.getRandomizedTempPath()
|
||||||
|
.concat("/" + UUID.randomUUID().toString());
|
||||||
|
;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void resetEventCollector() throws IOException {
|
||||||
|
OzoneConfiguration conf = new OzoneConfiguration();
|
||||||
|
nodeManager = new SCMNodeManager(conf, "cluster1", null, new EventQueue());
|
||||||
|
nodeReportHandler = new NodeReportHandler(nodeManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeReport() throws IOException {
|
||||||
|
DatanodeDetails dn = TestUtils.getDatanodeDetails();
|
||||||
|
List<StorageReportProto> reports =
|
||||||
|
TestUtils.createStorageReport(100, 10, 90, storagePath, null,
|
||||||
|
dn.getUuid().toString(), 1);
|
||||||
|
|
||||||
|
nodeReportHandler.onMessage(
|
||||||
|
getNodeReport(dn, reports), this);
|
||||||
|
SCMNodeMetric nodeMetric = nodeManager.getNodeStat(dn);
|
||||||
|
|
||||||
|
Assert.assertTrue(nodeMetric.get().getCapacity().get() == 100);
|
||||||
|
Assert.assertTrue(nodeMetric.get().getRemaining().get() == 90);
|
||||||
|
Assert.assertTrue(nodeMetric.get().getScmUsed().get() == 10);
|
||||||
|
|
||||||
|
reports =
|
||||||
|
TestUtils.createStorageReport(100, 10, 90, storagePath, null,
|
||||||
|
dn.getUuid().toString(), 2);
|
||||||
|
nodeReportHandler.onMessage(
|
||||||
|
getNodeReport(dn, reports), this);
|
||||||
|
nodeMetric = nodeManager.getNodeStat(dn);
|
||||||
|
|
||||||
|
Assert.assertTrue(nodeMetric.get().getCapacity().get() == 200);
|
||||||
|
Assert.assertTrue(nodeMetric.get().getRemaining().get() == 180);
|
||||||
|
Assert.assertTrue(nodeMetric.get().getScmUsed().get() == 20);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private NodeReportFromDatanode getNodeReport(DatanodeDetails dn,
|
||||||
|
List<StorageReportProto> reports) {
|
||||||
|
NodeReportProto nodeReportProto = TestUtils.createNodeReport(reports);
|
||||||
|
return new NodeReportFromDatanode(dn, nodeReportProto);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
|
||||||
|
EVENT_TYPE event, PAYLOAD payload) {
|
||||||
|
LOG.info("Event is published: {}", payload);
|
||||||
|
}
|
||||||
|
}
|
@ -289,6 +289,16 @@ public void addDatanodeCommand(UUID dnId, SCMCommand command) {
|
|||||||
this.commandQueue.addCommand(dnId, command);
|
this.commandQueue.addCommand(dnId, command);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Empty implementation for processNodeReport.
|
||||||
|
* @param dnUuid
|
||||||
|
* @param nodeReport
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void processNodeReport(UUID dnUuid, NodeReportProto nodeReport) {
|
||||||
|
// do nothing.
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(CommandForDatanode commandForDatanode,
|
public void onMessage(CommandForDatanode commandForDatanode,
|
||||||
EventPublisher publisher) {
|
EventPublisher publisher) {
|
||||||
|
Loading…
Reference in New Issue
Block a user