diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java new file mode 100644 index 0000000000..36f10a93dc --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.server; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.server.events.TypedEvent; + +import com.google.protobuf.GeneratedMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is responsible for dispatching heartbeat from datanode to + * appropriate EventHandler at SCM. + */ +public final class SCMDatanodeHeartbeatDispatcher { + + private static final Logger LOG = + LoggerFactory.getLogger(SCMDatanodeHeartbeatDispatcher.class); + + private EventPublisher eventPublisher; + + public static final TypedEvent NODE_REPORT = + new TypedEvent<>(NodeReportFromDatanode.class); + + public static final TypedEvent CONTAINER_REPORT = + new TypedEvent(ContainerReportFromDatanode.class); + + public SCMDatanodeHeartbeatDispatcher(EventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + } + + + /** + * Dispatches heartbeat to registered event handlers. + * + * @param heartbeat heartbeat to be dispatched. + */ + public void dispatch(SCMHeartbeatRequestProto heartbeat) { + DatanodeDetails datanodeDetails = + DatanodeDetails.getFromProtoBuf(heartbeat.getDatanodeDetails()); + + if (heartbeat.hasNodeReport()) { + eventPublisher.fireEvent(NODE_REPORT, + new NodeReportFromDatanode(datanodeDetails, + heartbeat.getNodeReport())); + } + + if (heartbeat.hasContainerReport()) { + eventPublisher.fireEvent(CONTAINER_REPORT, + new ContainerReportFromDatanode(datanodeDetails, + heartbeat.getContainerReport())); + + } + } + + /** + * Wrapper class for events with the datanode origin. + */ + public static class ReportFromDatanode { + + private final DatanodeDetails datanodeDetails; + + private final T report; + + public ReportFromDatanode(DatanodeDetails datanodeDetails, T report) { + this.datanodeDetails = datanodeDetails; + this.report = report; + } + + public DatanodeDetails getDatanodeDetails() { + return datanodeDetails; + } + + public T getReport() { + return report; + } + } + + /** + * Node report event payload with origin. + */ + public static class NodeReportFromDatanode + extends ReportFromDatanode { + + public NodeReportFromDatanode(DatanodeDetails datanodeDetails, + NodeReportProto report) { + super(datanodeDetails, report); + } + } + + /** + * Container report event payload with origin. + */ + public static class ContainerReportFromDatanode + extends ReportFromDatanode { + + public ContainerReportFromDatanode(DatanodeDetails datanodeDetails, + ContainerReportsProto report) { + super(datanodeDetails, report); + } + } + +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java index eb5ce1a827..56b07190a5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java @@ -73,7 +73,7 @@ import org.apache.hadoop.hdds.scm.HddsServerUtil; -import org.apache.hadoop.hdds.scm.server.report.SCMDatanodeHeartbeatDispatcher; +import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; @@ -122,14 +122,19 @@ public class SCMDatanodeProtocolServer implements private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher; public SCMDatanodeProtocolServer(final OzoneConfiguration conf, - StorageContainerManager scm) throws IOException { + StorageContainerManager scm, EventPublisher eventPublisher) + throws IOException { Preconditions.checkNotNull(scm, "SCM cannot be null"); + Preconditions.checkNotNull(eventPublisher, "EventPublisher cannot be null"); + this.scm = scm; final int handlerCount = conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY, OZONE_SCM_HANDLER_COUNT_DEFAULT); + heartbeatDispatcher = new SCMDatanodeHeartbeatDispatcher(eventPublisher); + RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class, ProtobufRpcEngine.class); BlockingService dnProtoPbService = @@ -155,10 +160,6 @@ public SCMDatanodeProtocolServer(final OzoneConfiguration conf, conf, OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer); - heartbeatDispatcher = SCMDatanodeHeartbeatDispatcher.newBuilder(conf, scm) - .addHandlerFor(NodeReportProto.class) - .addHandlerFor(ContainerReportsProto.class) - .build(); } public void start() { @@ -319,7 +320,6 @@ public void stop() { try { LOG.info("Stopping the RPC server for DataNodes"); datanodeRpcServer.stop(); - heartbeatDispatcher.shutdown(); } catch (Exception ex) { LOG.error(" datanodeRpcServer stop failed.", ex); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 5725d236ae..568a86ab4f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -52,7 +52,6 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.Storage.StorageState; import org.apache.hadoop.ozone.common.StorageInfo; -import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.StringUtils; @@ -182,7 +181,8 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { scmAdminUsernames.add(scmUsername); } - datanodeProtocolServer = new SCMDatanodeProtocolServer(conf, this); + datanodeProtocolServer = new SCMDatanodeProtocolServer(conf, this, + eventQueue); blockProtocolServer = new SCMBlockProtocolServer(conf, this); clientProtocolServer = new SCMClientProtocolServer(conf, this); httpServer = new StorageContainerManagerHttpServer(conf); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeContainerReportHandler.java deleted file mode 100644 index 00ce94d7f5..0000000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeContainerReportHandler.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.server.report; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat; -import org.apache.hadoop.hdds.scm.server.StorageContainerManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * Handler for Datanode Container Report. - */ -public class SCMDatanodeContainerReportHandler extends - SCMDatanodeReportHandler { - - private static final Logger LOG = LoggerFactory.getLogger( - SCMDatanodeContainerReportHandler.class); - - @Override - public void processReport(DatanodeDetails datanodeDetails, - ContainerReportsProto report) throws IOException { - LOG.trace("Processing container report from {}.", datanodeDetails); - updateContainerReportMetrics(datanodeDetails, report); - getSCM().getScmContainerManager() - .processContainerReports(datanodeDetails, report); - } - - /** - * Updates container report metrics in SCM. - * - * @param datanodeDetails Datanode Information - * @param reports Container Reports - */ - private void updateContainerReportMetrics(DatanodeDetails datanodeDetails, - ContainerReportsProto reports) { - ContainerStat newStat = new ContainerStat(); - for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports - .getReportsList()) { - newStat.add(new ContainerStat(info.getSize(), info.getUsed(), - info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(), - info.getReadCount(), info.getWriteCount())); - } - // update container metrics - StorageContainerManager.getMetrics().setLastContainerStat(newStat); - - // Update container stat entry, this will trigger a removal operation if it - // exists in cache. - String datanodeUuid = datanodeDetails.getUuidString(); - getSCM().getContainerReportCache().put(datanodeUuid, newStat); - // update global view container metrics - StorageContainerManager.getMetrics().incrContainerStat(newStat); - } - -} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeHeartbeatDispatcher.java deleted file mode 100644 index d50edff7c5..0000000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeHeartbeatDispatcher.java +++ /dev/null @@ -1,189 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.server.report; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.protobuf.GeneratedMessage; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; -import org.apache.hadoop.hdds.scm.server.StorageContainerManager; -import org.apache.hadoop.util.concurrent.HadoopExecutors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutorService; - -/** - * This class is responsible for dispatching heartbeat from datanode to - * appropriate ReportHandlers at SCM. - * Only one handler per report is supported now, it's very easy to support - * multiple handlers for a report. - */ -public final class SCMDatanodeHeartbeatDispatcher { - - private static final Logger LOG = LoggerFactory.getLogger( - SCMDatanodeHeartbeatDispatcher.class); - - /** - * This stores Report to Handler mapping. - */ - private final Map, - SCMDatanodeReportHandler> handlers; - - /** - * Executor service which will be used for processing reports. - */ - private final ExecutorService executorService; - - /** - * Constructs SCMDatanodeHeartbeatDispatcher instance with the given - * handlers. - * - * @param handlers report to report handler mapping - */ - private SCMDatanodeHeartbeatDispatcher(Map, - SCMDatanodeReportHandler> handlers) { - this.handlers = handlers; - this.executorService = HadoopExecutors.newCachedThreadPool( - new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("SCMDatanode Heartbeat Dispatcher Thread - %d") - .build()); - } - - /** - * Dispatches heartbeat to registered handlers. - * - * @param heartbeat heartbeat to be dispatched. - */ - public void dispatch(SCMHeartbeatRequestProto heartbeat) { - DatanodeDetails datanodeDetails = DatanodeDetails - .getFromProtoBuf(heartbeat.getDatanodeDetails()); - if (heartbeat.hasNodeReport()) { - processReport(datanodeDetails, heartbeat.getNodeReport()); - } - if (heartbeat.hasContainerReport()) { - processReport(datanodeDetails, heartbeat.getContainerReport()); - } - } - - /** - * Invokes appropriate ReportHandler and submits the task to executor - * service for processing. - * - * @param datanodeDetails Datanode Information - * @param report Report to be processed - */ - @SuppressWarnings("unchecked") - private void processReport(DatanodeDetails datanodeDetails, - GeneratedMessage report) { - executorService.submit(() -> { - try { - SCMDatanodeReportHandler handler = handlers.get(report.getClass()); - handler.processReport(datanodeDetails, report); - } catch (IOException ex) { - LOG.error("Exception wile processing report {}, from {}", - report.getClass(), datanodeDetails, ex); - } - }); - } - - /** - * Shuts down SCMDatanodeHeartbeatDispatcher. - */ - public void shutdown() { - executorService.shutdown(); - } - - /** - * Returns a new Builder to construct {@link SCMDatanodeHeartbeatDispatcher}. - * - * @param conf Configuration to be used by SCMDatanodeHeartbeatDispatcher - * @param scm {@link StorageContainerManager} instance to be used by report - * handlers - * - * @return {@link SCMDatanodeHeartbeatDispatcher.Builder} instance - */ - public static Builder newBuilder(Configuration conf, - StorageContainerManager scm) { - return new Builder(conf, scm); - } - - /** - * Builder for SCMDatanodeHeartbeatDispatcher. - */ - public static class Builder { - - private final SCMDatanodeReportHandlerFactory reportHandlerFactory; - private final Map, - SCMDatanodeReportHandler> report2handler; - - /** - * Constructs SCMDatanodeHeartbeatDispatcher.Builder instance. - * - * @param conf Configuration object to be used. - * @param scm StorageContainerManager instance to be used for report - * handler initialization. - */ - private Builder(Configuration conf, StorageContainerManager scm) { - this.report2handler = new HashMap<>(); - this.reportHandlerFactory = - new SCMDatanodeReportHandlerFactory(conf, scm); - } - - /** - * Adds new report handler for the given report. - * - * @param report Report for which handler has to be added - * - * @return Builder - */ - public Builder addHandlerFor(Class report) { - report2handler.put(report, reportHandlerFactory.getHandlerFor(report)); - return this; - } - - /** - * Associates the given report handler for the given report. - * - * @param report Report to be associated with - * @param handler Handler to be used for the report - * - * @return Builder - */ - public Builder addHandler(Class report, - SCMDatanodeReportHandler handler) { - report2handler.put(report, handler); - return this; - } - - /** - * Builds and returns {@link SCMDatanodeHeartbeatDispatcher} instance. - * - * @return SCMDatanodeHeartbeatDispatcher - */ - public SCMDatanodeHeartbeatDispatcher build() { - return new SCMDatanodeHeartbeatDispatcher(report2handler); - } - } - -} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeNodeReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeNodeReportHandler.java deleted file mode 100644 index fb89b02215..0000000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeNodeReportHandler.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.server.report; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * Handles Datanode Node Report. - */ -public class SCMDatanodeNodeReportHandler extends - SCMDatanodeReportHandler { - - private static final Logger LOG = LoggerFactory.getLogger( - SCMDatanodeNodeReportHandler.class); - - @Override - public void processReport(DatanodeDetails datanodeDetails, - NodeReportProto report) throws IOException { - LOG.debug("Processing node report from {}.", datanodeDetails); - //TODO: add logic to process node report. - } -} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandler.java deleted file mode 100644 index d3386493c1..0000000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandler.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.server.report; - -import com.google.protobuf.GeneratedMessage; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.server.StorageContainerManager; - -import java.io.IOException; - -/** - * Datanode Report handlers should implement this interface in order to get - * call back whenever the report is received from datanode. - * - * @param Type of report the handler is interested in. - */ -public abstract class SCMDatanodeReportHandler - implements Configurable { - - private Configuration config; - private StorageContainerManager scm; - - /** - * Initializes SCMDatanodeReportHandler and associates it with the given - * StorageContainerManager instance. - * - * @param storageContainerManager StorageContainerManager instance to be - * associated with. - */ - public void init(StorageContainerManager storageContainerManager) { - this.scm = storageContainerManager; - } - - /** - * Returns the associated StorageContainerManager instance. This will be - * used by the ReportHandler implementations. - * - * @return {@link StorageContainerManager} - */ - protected StorageContainerManager getSCM() { - return scm; - } - - @Override - public void setConf(Configuration conf) { - this.config = conf; - } - - @Override - public Configuration getConf() { - return config; - } - - /** - * Processes the report received from datanode. Each ReportHandler - * implementation is responsible for providing the logic to process the - * report it's interested in. - * - * @param datanodeDetails Datanode Information - * @param report Report to be processed - * - * @throws IOException In case of any exception - */ - abstract void processReport(DatanodeDetails datanodeDetails, T report) - throws IOException; -} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandlerFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandlerFactory.java deleted file mode 100644 index e88495fc23..0000000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/SCMDatanodeReportHandlerFactory.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.server.report; - -import com.google.protobuf.GeneratedMessage; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; -import org.apache.hadoop.hdds.scm.server.StorageContainerManager; -import org.apache.hadoop.util.ReflectionUtils; - -import java.util.HashMap; -import java.util.Map; - - -/** - * Factory class to construct {@link SCMDatanodeReportHandler} given a report. - */ -public class SCMDatanodeReportHandlerFactory { - - private final Configuration conf; - private final StorageContainerManager scm; - private final Map, - Class>> - report2handler; - - /** - * Constructs {@link SCMDatanodeReportHandler} instance. - * - * @param conf Configuration to be passed to the - * {@link SCMDatanodeReportHandler} - */ - public SCMDatanodeReportHandlerFactory(Configuration conf, - StorageContainerManager scm) { - this.conf = conf; - this.scm = scm; - this.report2handler = new HashMap<>(); - - report2handler.put(NodeReportProto.class, - SCMDatanodeNodeReportHandler.class); - report2handler.put(ContainerReportsProto.class, - SCMDatanodeContainerReportHandler.class); - } - - /** - * Returns the SCMDatanodeReportHandler for the corresponding report. - * - * @param report report - * - * @return report handler - */ - public SCMDatanodeReportHandler getHandlerFor( - Class report) { - Class> - handlerClass = report2handler.get(report); - if (handlerClass == null) { - throw new RuntimeException("No handler found for report " + report); - } - SCMDatanodeReportHandler instance = - ReflectionUtils.newInstance(handlerClass, conf); - instance.init(scm); - return instance; - } - -} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/package-info.java deleted file mode 100644 index fda3993096..0000000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/report/package-info.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.server.report; -/** - * Handling of all the datanode reports in SCM which are received through - * heartbeat is done here. - * - * SCM Datanode Report Processing State Diagram: - * - * SCMDatanode SCMDatanodeHeartbeat SCMDatanodeReport - * ProtocolServer Dispatcher Handler - * | | | - * | | | - * | construct | | - * |----------------------->| | - * | | | - * | | register | - * | |<-----------------------| - * | | | - * +------------+------------------------+------------------------+--------+ - * | loop | | | | - * | | | | | - * | | | | | - * | heartbeat | | | | - * - +----------->| | | | - * | from | heartbeat | | | - * | Datanode |----------------------->| | | - * | | | report | | - * | | |----------------------->| | - * | | | | | - * | DN | | | | - * <-+------------| | | | - * | commands | | | | - * | | | | | - * +------------+------------------------+------------------------+--------+ - * | | | - * | | | - * | shutdown | | - * |----------------------->| | - * | | | - * | | | - * - - - - */ \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java new file mode 100644 index 0000000000..326a34b792 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMDatanodeHeartbeatDispatcher.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.server; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .ContainerReportFromDatanode; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .NodeReportFromDatanode; +import org.apache.hadoop.hdds.server.events.Event; +import org.apache.hadoop.hdds.server.events.EventPublisher; + +import org.junit.Assert; +import org.junit.Test; + +/** + * This class tests the behavior of SCMDatanodeHeartbeatDispatcher. + */ +public class TestSCMDatanodeHeartbeatDispatcher { + + + @Test + public void testNodeReportDispatcher() throws IOException { + + Configuration conf = new OzoneConfiguration(); + + AtomicInteger eventReceived = new AtomicInteger(); + + NodeReportProto nodeReport = NodeReportProto.getDefaultInstance(); + + SCMDatanodeHeartbeatDispatcher dispatcher = + new SCMDatanodeHeartbeatDispatcher(new EventPublisher() { + @Override + public > void fireEvent( + EVENT_TYPE event, PAYLOAD payload) { + Assert.assertEquals(event, + SCMDatanodeHeartbeatDispatcher.NODE_REPORT); + eventReceived.incrementAndGet(); + Assert.assertEquals(nodeReport, ((NodeReportFromDatanode)payload).getReport()); + + } + }); + + DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); + + SCMHeartbeatRequestProto heartbeat = + SCMHeartbeatRequestProto.newBuilder() + .setDatanodeDetails(datanodeDetails.getProtoBufMessage()) + .setNodeReport(nodeReport) + .build(); + dispatcher.dispatch(heartbeat); + Assert.assertEquals(1, eventReceived.get()); + + + } + + @Test + public void testContainerReportDispatcher() throws IOException { + + Configuration conf = new OzoneConfiguration(); + + AtomicInteger eventReceived = new AtomicInteger(); + + ContainerReportsProto containerReport = + ContainerReportsProto.getDefaultInstance(); + + SCMDatanodeHeartbeatDispatcher dispatcher = + new SCMDatanodeHeartbeatDispatcher(new EventPublisher() { + @Override + public > void fireEvent( + EVENT_TYPE event, PAYLOAD payload) { + Assert.assertEquals(event, + SCMDatanodeHeartbeatDispatcher.CONTAINER_REPORT); + Assert.assertEquals(containerReport, ((ContainerReportFromDatanode)payload).getReport()); + eventReceived.incrementAndGet(); + } + }); + + DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); + + SCMHeartbeatRequestProto heartbeat = + SCMHeartbeatRequestProto.newBuilder() + .setDatanodeDetails(datanodeDetails.getProtoBufMessage()) + .setContainerReport(containerReport) + .build(); + dispatcher.dispatch(heartbeat); + Assert.assertEquals(1, eventReceived.get()); + + + } + +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeContainerReportHandler.java deleted file mode 100644 index 776ae88754..0000000000 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeContainerReportHandler.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.server.report; - -import org.junit.Assert; -import org.junit.Test; - -/** - * Test cases to verify SCMDatanodeContainerReportHandler's behavior. - */ -public class TestSCMDatanodeContainerReportHandler { - - //TODO: add test cases to verify SCMDatanodeContainerReportHandler. - - @Test - public void dummyTest() { - Assert.assertTrue(true); - } -} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeHeartbeatDispatcher.java deleted file mode 100644 index 5d086471c1..0000000000 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeHeartbeatDispatcher.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.server.report; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; -import org.apache.hadoop.hdds.scm.TestUtils; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; - -import java.io.IOException; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -/** - * This class tests the behavior of SCMDatanodeHeartbeatDispatcher. - */ -public class TestSCMDatanodeHeartbeatDispatcher { - - @Test - public void testSCMDatanodeHeartbeatDispatcherBuilder() { - Configuration conf = new OzoneConfiguration(); - SCMDatanodeHeartbeatDispatcher dispatcher = - SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null) - .addHandlerFor(NodeReportProto.class) - .addHandlerFor(ContainerReportsProto.class) - .build(); - Assert.assertNotNull(dispatcher); - } - - @Test - public void testNodeReportDispatcher() throws IOException { - Configuration conf = new OzoneConfiguration(); - SCMDatanodeNodeReportHandler nodeReportHandler = - Mockito.mock(SCMDatanodeNodeReportHandler.class); - SCMDatanodeHeartbeatDispatcher dispatcher = - SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null) - .addHandler(NodeReportProto.class, nodeReportHandler) - .build(); - - DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); - NodeReportProto nodeReport = NodeReportProto.getDefaultInstance(); - SCMHeartbeatRequestProto heartbeat = - SCMHeartbeatRequestProto.newBuilder() - .setDatanodeDetails(datanodeDetails.getProtoBufMessage()) - .setNodeReport(nodeReport) - .build(); - dispatcher.dispatch(heartbeat); - verify(nodeReportHandler, - times(1)) - .processReport(any(DatanodeDetails.class), eq(nodeReport)); - } - - @Test - public void testContainerReportDispatcher() throws IOException { - Configuration conf = new OzoneConfiguration(); - SCMDatanodeContainerReportHandler containerReportHandler = - Mockito.mock(SCMDatanodeContainerReportHandler.class); - SCMDatanodeHeartbeatDispatcher dispatcher = - SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null) - .addHandler(ContainerReportsProto.class, containerReportHandler) - .build(); - - DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); - ContainerReportsProto containerReport = - ContainerReportsProto.getDefaultInstance(); - SCMHeartbeatRequestProto heartbeat = - SCMHeartbeatRequestProto.newBuilder() - .setDatanodeDetails(datanodeDetails.getProtoBufMessage()) - .setContainerReport(containerReport) - .build(); - dispatcher.dispatch(heartbeat); - verify(containerReportHandler, - times(1)) - .processReport(any(DatanodeDetails.class), - any(ContainerReportsProto.class)); - } - - @Test - public void testNodeAndContainerReportDispatcher() throws IOException { - Configuration conf = new OzoneConfiguration(); - SCMDatanodeNodeReportHandler nodeReportHandler = - Mockito.mock(SCMDatanodeNodeReportHandler.class); - SCMDatanodeContainerReportHandler containerReportHandler = - Mockito.mock(SCMDatanodeContainerReportHandler.class); - SCMDatanodeHeartbeatDispatcher dispatcher = - SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null) - .addHandler(NodeReportProto.class, nodeReportHandler) - .addHandler(ContainerReportsProto.class, containerReportHandler) - .build(); - - DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails(); - NodeReportProto nodeReport = NodeReportProto.getDefaultInstance(); - ContainerReportsProto containerReport = - ContainerReportsProto.getDefaultInstance(); - SCMHeartbeatRequestProto heartbeat = - SCMHeartbeatRequestProto.newBuilder() - .setDatanodeDetails(datanodeDetails.getProtoBufMessage()) - .setNodeReport(nodeReport) - .setContainerReport(containerReport) - .build(); - dispatcher.dispatch(heartbeat); - verify(nodeReportHandler, - times(1)) - .processReport(any(DatanodeDetails.class), any(NodeReportProto.class)); - verify(containerReportHandler, - times(1)) - .processReport(any(DatanodeDetails.class), - any(ContainerReportsProto.class)); - } - -} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeNodeReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeNodeReportHandler.java deleted file mode 100644 index 30a753c024..0000000000 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeNodeReportHandler.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.server.report; - - -import org.junit.Assert; -import org.junit.Test; - -/** - * Test cases to verify TestSCMDatanodeNodeReportHandler's behavior. - */ -public class TestSCMDatanodeNodeReportHandler { - - - //TODO: add test cases to verify SCMDatanodeNodeReportHandler. - - @Test - public void dummyTest() { - Assert.assertTrue(true); - } -} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeReportHandlerFactory.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeReportHandlerFactory.java deleted file mode 100644 index 4b918f76c7..0000000000 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/TestSCMDatanodeReportHandlerFactory.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.server.report; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; -import org.junit.Assert; -import org.junit.Test; - -/** - * Test cases to verify the functionality of SCMDatanodeReportHandlerFactory. - */ -public class TestSCMDatanodeReportHandlerFactory { - - @Test - public void testNodeReportHandlerConstruction() { - Configuration conf = new OzoneConfiguration(); - SCMDatanodeReportHandlerFactory factory = - new SCMDatanodeReportHandlerFactory(conf, null); - Assert.assertTrue(factory.getHandlerFor(NodeReportProto.class) - instanceof SCMDatanodeNodeReportHandler); - } - - @Test - public void testContainerReporttHandlerConstruction() { - Configuration conf = new OzoneConfiguration(); - SCMDatanodeReportHandlerFactory factory = - new SCMDatanodeReportHandlerFactory(conf, null); - Assert.assertTrue(factory.getHandlerFor(ContainerReportsProto.class) - instanceof SCMDatanodeContainerReportHandler); - } -} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/package-info.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/package-info.java deleted file mode 100644 index 4a3f59f016..0000000000 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/report/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.hadoop.hdds.scm.server.report; -/** - * Contains test-cases to test Datanode report handlers in SCM. - */ \ No newline at end of file diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java deleted file mode 100644 index ecddf8eaca..0000000000 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMetrics.java +++ /dev/null @@ -1,253 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.scm; - -import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; -import static org.apache.hadoop.test.MetricsAsserts.getLongGauge; -import static org.apache.hadoop.test.MetricsAsserts.getMetrics; -import static org.junit.Assert.assertEquals; - -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.lang3.RandomUtils; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.scm.server.StorageContainerManager; -import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.scm.server.report - .SCMDatanodeContainerReportHandler; -import org.apache.hadoop.metrics2.MetricsRecordBuilder; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.container.common.helpers.ContainerReport; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat; -import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics; -import org.apache.hadoop.hdds.scm.node.SCMNodeManager; -import org.apache.hadoop.test.GenericTestUtils; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.Timeout; - -/** - * This class tests the metrics of Storage Container Manager. - */ -public class TestSCMMetrics { - /** - * Set the timeout for each test. - */ - @Rule - public Timeout testTimeout = new Timeout(90000); - - private static MiniOzoneCluster cluster = null; - - @Test - public void testContainerMetrics() throws Exception { - int nodeCount = 2; - int numReport = 2; - long size = OzoneConsts.GB * 5; - long used = OzoneConsts.GB * 2; - long readBytes = OzoneConsts.GB * 1; - long writeBytes = OzoneConsts.GB * 2; - int keyCount = 1000; - int readCount = 100; - int writeCount = 50; - OzoneConfiguration conf = new OzoneConfiguration(); - - try { - cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(nodeCount).build(); - cluster.waitForClusterToBeReady(); - - ContainerStat stat = new ContainerStat(size, used, keyCount, readBytes, - writeBytes, readCount, writeCount); - StorageContainerManager scmManager = cluster.getStorageContainerManager(); - DatanodeDetails fstDatanodeDetails = TestUtils.getDatanodeDetails(); - ContainerReportsProto request = createContainerReport(numReport, stat); - String fstDatanodeUuid = fstDatanodeDetails.getUuidString(); - SCMDatanodeContainerReportHandler containerReportHandler = - new SCMDatanodeContainerReportHandler(); - containerReportHandler.setConf(conf); - containerReportHandler.init(scmManager); - containerReportHandler.processReport( - fstDatanodeDetails, request); - - // verify container stat metrics - MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); - assertEquals(size * numReport, - getLongGauge("LastContainerReportSize", scmMetrics)); - assertEquals(used * numReport, - getLongGauge("LastContainerReportUsed", scmMetrics)); - assertEquals(readBytes * numReport, - getLongGauge("LastContainerReportReadBytes", scmMetrics)); - assertEquals(writeBytes * numReport, - getLongGauge("LastContainerReportWriteBytes", scmMetrics)); - - assertEquals(keyCount * numReport, - getLongGauge("LastContainerReportKeyCount", scmMetrics)); - assertEquals(readCount * numReport, - getLongGauge("LastContainerReportReadCount", scmMetrics)); - assertEquals(writeCount * numReport, - getLongGauge("LastContainerReportWriteCount", scmMetrics)); - - // add one new report - DatanodeDetails sndDatanodeDetails = TestUtils.getDatanodeDetails(); - request = createContainerReport(1, stat); - String sndDatanodeUuid = sndDatanodeDetails.getUuidString(); - containerReportHandler.processReport( - sndDatanodeDetails, request); - - scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); - assertEquals(size * (numReport + 1), - getLongCounter("ContainerReportSize", scmMetrics)); - assertEquals(used * (numReport + 1), - getLongCounter("ContainerReportUsed", scmMetrics)); - assertEquals(readBytes * (numReport + 1), - getLongCounter("ContainerReportReadBytes", scmMetrics)); - assertEquals(writeBytes * (numReport + 1), - getLongCounter("ContainerReportWriteBytes", scmMetrics)); - - assertEquals(keyCount * (numReport + 1), - getLongCounter("ContainerReportKeyCount", scmMetrics)); - assertEquals(readCount * (numReport + 1), - getLongCounter("ContainerReportReadCount", scmMetrics)); - assertEquals(writeCount * (numReport + 1), - getLongCounter("ContainerReportWriteCount", scmMetrics)); - - // Re-send reports but with different value for validating - // the aggregation. - stat = new ContainerStat(100, 50, 3, 50, 60, 5, 6); - containerReportHandler.processReport( - fstDatanodeDetails, createContainerReport(1, stat)); - - stat = new ContainerStat(1, 1, 1, 1, 1, 1, 1); - containerReportHandler.processReport( - sndDatanodeDetails, createContainerReport(1, stat)); - - // the global container metrics value should be updated - scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); - assertEquals(101, getLongCounter("ContainerReportSize", scmMetrics)); - assertEquals(51, getLongCounter("ContainerReportUsed", scmMetrics)); - assertEquals(51, getLongCounter("ContainerReportReadBytes", scmMetrics)); - assertEquals(61, getLongCounter("ContainerReportWriteBytes", scmMetrics)); - - assertEquals(4, getLongCounter("ContainerReportKeyCount", scmMetrics)); - assertEquals(6, getLongCounter("ContainerReportReadCount", scmMetrics)); - assertEquals(7, getLongCounter("ContainerReportWriteCount", scmMetrics)); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - - @Test - public void testStaleNodeContainerReport() throws Exception { - int nodeCount = 2; - int numReport = 2; - long size = OzoneConsts.GB * 5; - long used = OzoneConsts.GB * 2; - long readBytes = OzoneConsts.GB * 1; - long writeBytes = OzoneConsts.GB * 2; - int keyCount = 1000; - int readCount = 100; - int writeCount = 50; - OzoneConfiguration conf = new OzoneConfiguration(); - - try { - cluster = MiniOzoneCluster.newBuilder(conf) - .setNumDatanodes(nodeCount).build(); - cluster.waitForClusterToBeReady(); - - ContainerStat stat = new ContainerStat(size, used, keyCount, readBytes, - writeBytes, readCount, writeCount); - StorageContainerManager scmManager = cluster.getStorageContainerManager(); - - DatanodeDetails datanodeDetails = cluster.getHddsDatanodes().get(0) - .getDatanodeDetails(); - SCMDatanodeContainerReportHandler containerReportHandler = - new SCMDatanodeContainerReportHandler(); - containerReportHandler.setConf(conf); - containerReportHandler.init(scmManager); - ContainerReportsProto request = createContainerReport(numReport, stat); - containerReportHandler.processReport( - datanodeDetails, request); - - MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME); - assertEquals(size * numReport, - getLongCounter("ContainerReportSize", scmMetrics)); - assertEquals(used * numReport, - getLongCounter("ContainerReportUsed", scmMetrics)); - assertEquals(readBytes * numReport, - getLongCounter("ContainerReportReadBytes", scmMetrics)); - assertEquals(writeBytes * numReport, - getLongCounter("ContainerReportWriteBytes", scmMetrics)); - - assertEquals(keyCount * numReport, - getLongCounter("ContainerReportKeyCount", scmMetrics)); - assertEquals(readCount * numReport, - getLongCounter("ContainerReportReadCount", scmMetrics)); - assertEquals(writeCount * numReport, - getLongCounter("ContainerReportWriteCount", scmMetrics)); - - // reset stale interval time to move node from healthy to stale - SCMNodeManager nodeManager = (SCMNodeManager) cluster - .getStorageContainerManager().getScmNodeManager(); - nodeManager.setStaleNodeIntervalMs(100); - - // verify the metrics when node becomes stale - GenericTestUtils.waitFor(() -> { - MetricsRecordBuilder metrics = getMetrics(SCMMetrics.SOURCE_NAME); - return 0 == getLongCounter("ContainerReportSize", metrics) - && 0 == getLongCounter("ContainerReportUsed", metrics) - && 0 == getLongCounter("ContainerReportReadBytes", metrics) - && 0 == getLongCounter("ContainerReportWriteBytes", metrics) - && 0 == getLongCounter("ContainerReportKeyCount", metrics) - && 0 == getLongCounter("ContainerReportReadCount", metrics) - && 0 == getLongCounter("ContainerReportWriteCount", metrics); - }, 1000, 60000); - } finally { - if (cluster != null) { - cluster.shutdown(); - } - } - } - - private ContainerReportsProto createContainerReport(int numReport, - ContainerStat stat) { - StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder - reportsBuilder = StorageContainerDatanodeProtocolProtos - .ContainerReportsProto.newBuilder(); - - for (int i = 0; i < numReport; i++) { - ContainerReport report = new ContainerReport( - RandomUtils.nextLong(), DigestUtils.sha256Hex("Simulated")); - report.setSize(stat.getSize().get()); - report.setBytesUsed(stat.getUsed().get()); - report.setReadCount(stat.getReadCount().get()); - report.setReadBytes(stat.getReadBytes().get()); - report.setKeyCount(stat.getKeyCount().get()); - report.setWriteCount(stat.getWriteCount().get()); - report.setWriteBytes(stat.getWriteBytes().get()); - reportsBuilder.addReports(report.getProtoBufMessage()); - } - return reportsBuilder.build(); - } -}