diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java new file mode 100644 index 0000000000..ea2b987036 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ContainerReportPublisher.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.report; + +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.ozone.OzoneConfigKeys; + +import java.util.concurrent.TimeUnit; + + +/** + * Publishes ContainerReport which will be sent to SCM as part of heartbeat. + * ContainerReport consist of the following information about each containers: + * - containerID + * - size + * - used + * - keyCount + * - readCount + * - writeCount + * - readBytes + * - writeBytes + * - finalHash + * - LifeCycleState + * + */ +public class ContainerReportPublisher extends + ReportPublisher { + + private Long containerReportInterval = null; + + @Override + protected long getReportFrequency() { + if (containerReportInterval == null) { + containerReportInterval = getConf().getTimeDuration( + OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL, + OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + } + // Add a random delay (0~30s) on top of the container report + // interval (60s) so tha the SCM is overwhelmed by the container reports + // sent in sync. + return containerReportInterval + getRandomReportDelay(); + } + + private long getRandomReportDelay() { + return RandomUtils.nextLong(0, containerReportInterval); + } + + @Override + protected ContainerReportsProto getReport() { + return ContainerReportsProto.getDefaultInstance(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/NodeReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/NodeReportPublisher.java new file mode 100644 index 0000000000..704b1f5b19 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/NodeReportPublisher.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.report; + +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.NodeReportProto; + +/** + * Publishes NodeReport which will be sent to SCM as part of heartbeat. + * NodeReport consist of: + * - NodeIOStats + * - VolumeReports + */ +public class NodeReportPublisher extends ReportPublisher { + + @Override + protected long getReportFrequency() { + return 90000L; + } + + @Override + protected NodeReportProto getReport() { + return NodeReportProto.getDefaultInstance(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java new file mode 100644 index 0000000000..c09282e1bf --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportManager.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.report; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.GeneratedMessage; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.util.concurrent.HadoopExecutors; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; + +/** + * ReportManager is responsible for managing all the {@link ReportPublisher} + * and also provides {@link ScheduledExecutorService} to ReportPublisher + * which should be used for scheduling the reports. + */ +public final class ReportManager { + + private final StateContext context; + private final List publishers; + private final ScheduledExecutorService executorService; + + /** + * Construction of {@link ReportManager} should be done via + * {@link ReportManager.Builder}. + * + * @param context StateContext which holds the report + * @param publishers List of publishers which generates report + */ + private ReportManager(StateContext context, + List publishers) { + this.context = context; + this.publishers = publishers; + this.executorService = HadoopExecutors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Datanode ReportManager Thread - %d").build()); + } + + /** + * Initializes ReportManager, also initializes all the configured + * report publishers. + */ + public void init() { + for (ReportPublisher publisher : publishers) { + publisher.init(context, executorService); + } + } + + /** + * Shutdown the ReportManager. + */ + public void shutdown() { + executorService.shutdown(); + } + + /** + * Returns new {@link ReportManager.Builder} which can be used to construct. + * {@link ReportManager} + * @param conf - Conf + * @return builder - Builder. + */ + public static Builder newBuilder(Configuration conf) { + return new Builder(conf); + } + + /** + * Builder to construct {@link ReportManager}. + */ + public static final class Builder { + + private StateContext stateContext; + private List reportPublishers; + private ReportPublisherFactory publisherFactory; + + + private Builder(Configuration conf) { + this.reportPublishers = new ArrayList<>(); + this.publisherFactory = new ReportPublisherFactory(conf); + } + + /** + * Sets the {@link StateContext}. + * + * @param context StateContext + + * @return ReportManager.Builder + */ + public Builder setStateContext(StateContext context) { + stateContext = context; + return this; + } + + /** + * Adds publisher for the corresponding report. + * + * @param report report for which publisher needs to be added + * + * @return ReportManager.Builder + */ + public Builder addPublisherFor(Class report) { + reportPublishers.add(publisherFactory.getPublisherFor(report)); + return this; + } + + /** + * Adds new ReportPublisher to the ReportManager. + * + * @param publisher ReportPublisher + * + * @return ReportManager.Builder + */ + public Builder addPublisher(ReportPublisher publisher) { + reportPublishers.add(publisher); + return this; + } + + /** + * Build and returns ReportManager. + * + * @return {@link ReportManager} + */ + public ReportManager build() { + Preconditions.checkNotNull(stateContext); + return new ReportManager(stateContext, reportPublishers); + } + + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java new file mode 100644 index 0000000000..4ff47a0523 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisher.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.report; + +import com.google.protobuf.GeneratedMessage; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.container.common.statemachine + .DatanodeStateMachine.DatanodeStates; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Abstract class responsible for scheduling the reports based on the + * configured interval. All the ReportPublishers should extend this class. + */ +public abstract class ReportPublisher + implements Configurable, Runnable { + + private Configuration config; + private StateContext context; + private ScheduledExecutorService executor; + + /** + * Initializes ReportPublisher with stateContext and executorService. + * + * @param stateContext Datanode state context + * @param executorService ScheduledExecutorService to schedule reports + */ + public void init(StateContext stateContext, + ScheduledExecutorService executorService) { + this.context = stateContext; + this.executor = executorService; + this.executor.schedule(this, + getReportFrequency(), TimeUnit.MILLISECONDS); + } + + @Override + public void setConf(Configuration conf) { + config = conf; + } + + @Override + public Configuration getConf() { + return config; + } + + @Override + public void run() { + publishReport(); + if (!executor.isShutdown() || + !(context.getState() == DatanodeStates.SHUTDOWN)) { + executor.schedule(this, + getReportFrequency(), TimeUnit.MILLISECONDS); + } + } + + /** + * Generates and publishes the report to datanode state context. + */ + private void publishReport() { + context.addReport(getReport()); + } + + /** + * Returns the frequency in which this particular report has to be scheduled. + * + * @return report interval in milliseconds + */ + protected abstract long getReportFrequency(); + + /** + * Generate and returns the report which has to be sent as part of heartbeat. + * + * @return datanode report + */ + protected abstract T getReport(); + +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java new file mode 100644 index 0000000000..dc246d9428 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/ReportPublisherFactory.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.report; + +import com.google.protobuf.GeneratedMessage; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.apache.hadoop.util.ReflectionUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * Factory class to construct {@link ReportPublisher} for a report. + */ +public class ReportPublisherFactory { + + private final Configuration conf; + private final Map, + Class> report2publisher; + + /** + * Constructs {@link ReportPublisherFactory} instance. + * + * @param conf Configuration to be passed to the {@link ReportPublisher} + */ + public ReportPublisherFactory(Configuration conf) { + this.conf = conf; + this.report2publisher = new HashMap<>(); + + report2publisher.put(NodeReportProto.class, NodeReportPublisher.class); + report2publisher.put(ContainerReportsProto.class, + ContainerReportPublisher.class); + } + + /** + * Returns the ReportPublisher for the corresponding report. + * + * @param report report + * + * @return report publisher + */ + public ReportPublisher getPublisherFor( + Class report) { + Class publisherClass = + report2publisher.get(report); + if (publisherClass == null) { + throw new RuntimeException("No publisher found for report " + report); + } + return ReflectionUtils.newInstance(publisherClass, conf); + } + +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/package-info.java new file mode 100644 index 0000000000..404b37a7b0 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/package-info.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.report; +/** + * Datanode Reports: As part of heartbeat, datanode has to share its current + * state with SCM. The state of datanode is split into multiple reports which + * are sent along with heartbeat in a configured frequency. + * + * This package contains code which is responsible for sending reports from + * datanode to SCM. + * + * ReportPublisherFactory: Given a report this constructs corresponding + * {@link org.apache.hadoop.ozone.container.common.report.ReportPublisher}. + * + * ReportManager: Manages and initializes all the available ReportPublishers. + * + * ReportPublisher: Abstract class responsible for scheduling the reports + * based on the configured interval. All the ReportPublishers should extend + * {@link org.apache.hadoop.ozone.container.common.report.ReportPublisher} + * + * How to add new report: + * + * 1. Create a new ReportPublisher class which extends + * {@link org.apache.hadoop.ozone.container.common.report.ReportPublisher}. + * + * 2. Add a mapping Report to ReportPublisher entry in ReportPublisherFactory. + * + * 3. In DatanodeStateMachine add the report to ReportManager instance. + * + * + * + * Datanode Reports State Diagram: + * + * DatanodeStateMachine ReportManager ReportPublisher SCM + * | | | | + * | | | | + * | construct | | | + * |----------------->| | | + * | | | | + * | init | | | + * |----------------->| | | + * | | init | | + * | |------------->| | + * | | | | + * +--------+------------------+--------------+--------------------+------+ + * |loop | | | | | + * | | | publish | | | + * | |<-----------------+--------------| | | + * | | | report | | | + * | | | | | | + * | | | | | | + * | | heartbeat(rpc) | | | | + * | |------------------+--------------+------------------->| | + * | | | | | | + * | | | | | | + * +--------+------------------+--------------+--------------------+------+ + * | | | | + * | | | | + * | | | | + * | shutdown | | | + * |----------------->| | | + * | | | | + * | | | | + * - - - - + */ \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index d0a4217245..cb4319dc94 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -21,7 +21,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.apache.hadoop.ozone.container.common.report.ReportManager; +import org.apache.hadoop.ozone.container.common.statemachine.commandhandler + .CloseContainerCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler .CommandDispatcher; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler @@ -56,6 +62,7 @@ public class DatanodeStateMachine implements Closeable { private final OzoneContainer container; private DatanodeDetails datanodeDetails; private final CommandDispatcher commandDispatcher; + private final ReportManager reportManager; private long commandsHandled; private AtomicLong nextHB; private Thread stateMachineThread = null; @@ -92,6 +99,12 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails, .setContainer(container) .setContext(context) .build(); + + reportManager = ReportManager.newBuilder(conf) + .setStateContext(context) + .addPublisherFor(NodeReportProto.class) + .addPublisherFor(ContainerReportsProto.class) + .build(); } /** @@ -125,12 +138,12 @@ private void start() throws IOException { long now = 0; container.start(); + reportManager.init(); initCommandHandlerThread(conf); while (context.getState() != DatanodeStates.SHUTDOWN) { try { LOG.debug("Executing cycle Number : {}", context.getExecutionCount()); nextHB.set(Time.monotonicNow() + heartbeatFrequency); - context.setNodeReport(container.getNodeReport()); context.execute(executorService, heartbeatFrequency, TimeUnit.MILLISECONDS); now = Time.monotonicNow(); @@ -307,6 +320,7 @@ public void join() throws InterruptedException { public synchronized void stopDaemon() { try { context.setState(DatanodeStates.SHUTDOWN); + reportManager.shutdown(); this.close(); LOG.info("Ozone container server stopped."); } catch (IOException e) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 4e3c610f77..98eb7a05f6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -16,9 +16,8 @@ */ package org.apache.hadoop.ozone.container.common.statemachine; +import com.google.protobuf.GeneratedMessage; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.NodeReportProto; import org.apache.hadoop.ozone.container.common.states.DatanodeState; import org.apache.hadoop.ozone.container.common.states.datanode .InitDatanodeState; @@ -28,7 +27,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.LinkedList; +import java.util.List; import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -51,8 +52,8 @@ public class StateContext { private final DatanodeStateMachine parent; private final AtomicLong stateExecutionCount; private final Configuration conf; + private final Queue reports; private DatanodeStateMachine.DatanodeStates state; - private NodeReportProto dnReport; /** * Constructs a StateContext. @@ -67,9 +68,9 @@ public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates this.state = state; this.parent = parent; commandQueue = new LinkedList<>(); + reports = new LinkedList<>(); lock = new ReentrantLock(); stateExecutionCount = new AtomicLong(0); - dnReport = NodeReportProto.getDefaultInstance(); } /** @@ -141,19 +142,53 @@ public void setState(DatanodeStateMachine.DatanodeStates state) { } /** - * Returns the node report of the datanode state context. - * @return the node report. + * Adds the report to report queue. + * + * @param report report to be added */ - public NodeReportProto getNodeReport() { - return dnReport; + public void addReport(GeneratedMessage report) { + synchronized (reports) { + reports.add(report); + } } /** - * Sets the storage location report of the datanode state context. - * @param nodeReport node report + * Returns the next report, or null if the report queue is empty. + * + * @return report */ - public void setNodeReport(NodeReportProto nodeReport) { - this.dnReport = nodeReport; + public GeneratedMessage getNextReport() { + synchronized (reports) { + return reports.poll(); + } + } + + /** + * Returns all the available reports from the report queue, or empty list if + * the queue is empty. + * + * @return List + */ + public List getAllAvailableReports() { + return getReports(Integer.MAX_VALUE); + } + + /** + * Returns available reports from the report queue with a max limit on + * list size, or empty list if the queue is empty. + * + * @return List + */ + public List getReports(int maxLimit) { + List results = new ArrayList<>(); + synchronized (reports) { + GeneratedMessage report = reports.poll(); + while(results.size() < maxLimit && report != null) { + results.add(report); + report = reports.poll(); + } + } + return results; } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 337cdfbcf8..3986faf37f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.states.endpoint; import com.google.common.base.Preconditions; +import com.google.protobuf.GeneratedMessage; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; @@ -99,13 +100,13 @@ public EndpointStateMachine.EndPointStates call() throws Exception { try { Preconditions.checkState(this.datanodeDetailsProto != null); - SCMHeartbeatRequestProto request = SCMHeartbeatRequestProto.newBuilder() - .setDatanodeDetails(datanodeDetailsProto) - .setNodeReport(context.getNodeReport()) - .build(); + SCMHeartbeatRequestProto.Builder requestBuilder = + SCMHeartbeatRequestProto.newBuilder() + .setDatanodeDetails(datanodeDetailsProto); + addReports(requestBuilder); SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint() - .sendHeartbeat(request); + .sendHeartbeat(requestBuilder.build()); processResponse(reponse, datanodeDetailsProto); rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now()); rpcEndpoint.zeroMissedCount(); @@ -117,6 +118,19 @@ public EndpointStateMachine.EndPointStates call() throws Exception { return rpcEndpoint.getState(); } + /** + * Adds all the available reports to heartbeat. + * + * @param requestBuilder builder to which the report has to be added. + */ + private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) { + for (GeneratedMessage report : context.getAllAvailableReports()) { + requestBuilder.setField( + SCMHeartbeatRequestProto.getDescriptor().findFieldByName( + report.getDescriptorForType().getName()), report); + } + } + /** * Returns a builder class for HeartbeatEndpointTask task. * @return Builder. diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportManager.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportManager.java new file mode 100644 index 0000000000..aae388dd5a --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportManager.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.report; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.ScheduledExecutorService; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Test cases to test {@link ReportManager}. + */ +public class TestReportManager { + + @Test + public void testReportManagerInit() { + Configuration conf = new OzoneConfiguration(); + StateContext dummyContext = Mockito.mock(StateContext.class); + ReportPublisher dummyPublisher = Mockito.mock(ReportPublisher.class); + ReportManager.Builder builder = ReportManager.newBuilder(conf); + builder.setStateContext(dummyContext); + builder.addPublisher(dummyPublisher); + ReportManager reportManager = builder.build(); + reportManager.init(); + verify(dummyPublisher, times(1)).init(eq(dummyContext), + any(ScheduledExecutorService.class)); + + } +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java new file mode 100644 index 0000000000..067c5624f6 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.report; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.GeneratedMessage; +import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Test cases to test {@link ReportPublisher}. + */ +public class TestReportPublisher { + + /** + * Dummy report publisher for testing. + */ + private class DummyReportPublisher extends ReportPublisher { + + private final long frequency; + private int getReportCount = 0; + + DummyReportPublisher(long frequency) { + this.frequency = frequency; + } + + @Override + protected long getReportFrequency() { + return frequency; + } + + @Override + protected GeneratedMessage getReport() { + getReportCount++; + return null; + } + } + + @Test + public void testReportPublisherInit() { + ReportPublisher publisher = new DummyReportPublisher(0); + StateContext dummyContext = Mockito.mock(StateContext.class); + ScheduledExecutorService dummyExecutorService = Mockito.mock( + ScheduledExecutorService.class); + publisher.init(dummyContext, dummyExecutorService); + verify(dummyExecutorService, times(1)).schedule(publisher, + 0, TimeUnit.MILLISECONDS); + } + + @Test + public void testScheduledReport() throws InterruptedException { + ReportPublisher publisher = new DummyReportPublisher(100); + StateContext dummyContext = Mockito.mock(StateContext.class); + ScheduledExecutorService executorService = HadoopExecutors + .newScheduledThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Unit test ReportManager Thread - %d").build()); + publisher.init(dummyContext, executorService); + Thread.sleep(150); + Assert.assertEquals(1, ((DummyReportPublisher)publisher).getReportCount); + Thread.sleep(150); + Assert.assertEquals(2, ((DummyReportPublisher)publisher).getReportCount); + executorService.shutdown(); + } + + @Test + public void testPublishReport() throws InterruptedException { + ReportPublisher publisher = new DummyReportPublisher(100); + StateContext dummyContext = Mockito.mock(StateContext.class); + ScheduledExecutorService executorService = HadoopExecutors + .newScheduledThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Unit test ReportManager Thread - %d").build()); + publisher.init(dummyContext, executorService); + Thread.sleep(150); + executorService.shutdown(); + Assert.assertEquals(1, ((DummyReportPublisher)publisher).getReportCount); + verify(dummyContext, times(1)).addReport(null); + + } + +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java new file mode 100644 index 0000000000..f8c5fe5e27 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisherFactory.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.container.common.report; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.NodeReportProto; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** + * Test cases to test ReportPublisherFactory. + */ +public class TestReportPublisherFactory { + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void testGetContainerReportPublisher() { + Configuration conf = new OzoneConfiguration(); + ReportPublisherFactory factory = new ReportPublisherFactory(conf); + ReportPublisher publisher = factory + .getPublisherFor(ContainerReportsProto.class); + Assert.assertEquals(ContainerReportPublisher.class, publisher.getClass()); + Assert.assertEquals(conf, publisher.getConf()); + } + + @Test + public void testGetNodeReportPublisher() { + Configuration conf = new OzoneConfiguration(); + ReportPublisherFactory factory = new ReportPublisherFactory(conf); + ReportPublisher publisher = factory + .getPublisherFor(NodeReportProto.class); + Assert.assertEquals(NodeReportPublisher.class, publisher.getClass()); + Assert.assertEquals(conf, publisher.getConf()); + } + + @Test + public void testInvalidReportPublisher() { + Configuration conf = new OzoneConfiguration(); + ReportPublisherFactory factory = new ReportPublisherFactory(conf); + exception.expect(RuntimeException.class); + exception.expectMessage("No publisher found for report"); + factory.getPublisherFor(HddsProtos.DatanodeDetailsProto.class); + } +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/package-info.java new file mode 100644 index 0000000000..37615bc753 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.report; +/** + * This package has test cases for all the report publishers which generates + * reports that are sent to SCM via heartbeat. + */ \ No newline at end of file