HDDS-129. Support for ReportManager in Datanode.

Contributed by Nanda Kumar.
This commit is contained in:
Anu Engineer 2018-06-05 10:31:42 -07:00
parent 920d154997
commit baebe4d52b
13 changed files with 834 additions and 19 deletions

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.report;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import java.util.concurrent.TimeUnit;
/**
* Publishes ContainerReport which will be sent to SCM as part of heartbeat.
* ContainerReport consist of the following information about each containers:
* - containerID
* - size
* - used
* - keyCount
* - readCount
* - writeCount
* - readBytes
* - writeBytes
* - finalHash
* - LifeCycleState
*
*/
public class ContainerReportPublisher extends
ReportPublisher<ContainerReportsProto> {
private Long containerReportInterval = null;
@Override
protected long getReportFrequency() {
if (containerReportInterval == null) {
containerReportInterval = getConf().getTimeDuration(
OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL,
OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
}
// Add a random delay (0~30s) on top of the container report
// interval (60s) so tha the SCM is overwhelmed by the container reports
// sent in sync.
return containerReportInterval + getRandomReportDelay();
}
private long getRandomReportDelay() {
return RandomUtils.nextLong(0, containerReportInterval);
}
@Override
protected ContainerReportsProto getReport() {
return ContainerReportsProto.getDefaultInstance();
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.report;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
/**
* Publishes NodeReport which will be sent to SCM as part of heartbeat.
* NodeReport consist of:
* - NodeIOStats
* - VolumeReports
*/
public class NodeReportPublisher extends ReportPublisher<NodeReportProto> {
@Override
protected long getReportFrequency() {
return 90000L;
}
@Override
protected NodeReportProto getReport() {
return NodeReportProto.getDefaultInstance();
}
}

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.report;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
/**
* ReportManager is responsible for managing all the {@link ReportPublisher}
* and also provides {@link ScheduledExecutorService} to ReportPublisher
* which should be used for scheduling the reports.
*/
public final class ReportManager {
private final StateContext context;
private final List<ReportPublisher> publishers;
private final ScheduledExecutorService executorService;
/**
* Construction of {@link ReportManager} should be done via
* {@link ReportManager.Builder}.
*
* @param context StateContext which holds the report
* @param publishers List of publishers which generates report
*/
private ReportManager(StateContext context,
List<ReportPublisher> publishers) {
this.context = context;
this.publishers = publishers;
this.executorService = HadoopExecutors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Datanode ReportManager Thread - %d").build());
}
/**
* Initializes ReportManager, also initializes all the configured
* report publishers.
*/
public void init() {
for (ReportPublisher publisher : publishers) {
publisher.init(context, executorService);
}
}
/**
* Shutdown the ReportManager.
*/
public void shutdown() {
executorService.shutdown();
}
/**
* Returns new {@link ReportManager.Builder} which can be used to construct.
* {@link ReportManager}
* @param conf - Conf
* @return builder - Builder.
*/
public static Builder newBuilder(Configuration conf) {
return new Builder(conf);
}
/**
* Builder to construct {@link ReportManager}.
*/
public static final class Builder {
private StateContext stateContext;
private List<ReportPublisher> reportPublishers;
private ReportPublisherFactory publisherFactory;
private Builder(Configuration conf) {
this.reportPublishers = new ArrayList<>();
this.publisherFactory = new ReportPublisherFactory(conf);
}
/**
* Sets the {@link StateContext}.
*
* @param context StateContext
* @return ReportManager.Builder
*/
public Builder setStateContext(StateContext context) {
stateContext = context;
return this;
}
/**
* Adds publisher for the corresponding report.
*
* @param report report for which publisher needs to be added
*
* @return ReportManager.Builder
*/
public Builder addPublisherFor(Class<? extends GeneratedMessage> report) {
reportPublishers.add(publisherFactory.getPublisherFor(report));
return this;
}
/**
* Adds new ReportPublisher to the ReportManager.
*
* @param publisher ReportPublisher
*
* @return ReportManager.Builder
*/
public Builder addPublisher(ReportPublisher publisher) {
reportPublishers.add(publisher);
return this;
}
/**
* Build and returns ReportManager.
*
* @return {@link ReportManager}
*/
public ReportManager build() {
Preconditions.checkNotNull(stateContext);
return new ReportManager(stateContext, reportPublishers);
}
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.report;
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.container.common.statemachine
.DatanodeStateMachine.DatanodeStates;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Abstract class responsible for scheduling the reports based on the
* configured interval. All the ReportPublishers should extend this class.
*/
public abstract class ReportPublisher<T extends GeneratedMessage>
implements Configurable, Runnable {
private Configuration config;
private StateContext context;
private ScheduledExecutorService executor;
/**
* Initializes ReportPublisher with stateContext and executorService.
*
* @param stateContext Datanode state context
* @param executorService ScheduledExecutorService to schedule reports
*/
public void init(StateContext stateContext,
ScheduledExecutorService executorService) {
this.context = stateContext;
this.executor = executorService;
this.executor.schedule(this,
getReportFrequency(), TimeUnit.MILLISECONDS);
}
@Override
public void setConf(Configuration conf) {
config = conf;
}
@Override
public Configuration getConf() {
return config;
}
@Override
public void run() {
publishReport();
if (!executor.isShutdown() ||
!(context.getState() == DatanodeStates.SHUTDOWN)) {
executor.schedule(this,
getReportFrequency(), TimeUnit.MILLISECONDS);
}
}
/**
* Generates and publishes the report to datanode state context.
*/
private void publishReport() {
context.addReport(getReport());
}
/**
* Returns the frequency in which this particular report has to be scheduled.
*
* @return report interval in milliseconds
*/
protected abstract long getReportFrequency();
/**
* Generate and returns the report which has to be sent as part of heartbeat.
*
* @return datanode report
*/
protected abstract T getReport();
}

View File

@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.report;
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.util.ReflectionUtils;
import java.util.HashMap;
import java.util.Map;
/**
* Factory class to construct {@link ReportPublisher} for a report.
*/
public class ReportPublisherFactory {
private final Configuration conf;
private final Map<Class<? extends GeneratedMessage>,
Class<? extends ReportPublisher>> report2publisher;
/**
* Constructs {@link ReportPublisherFactory} instance.
*
* @param conf Configuration to be passed to the {@link ReportPublisher}
*/
public ReportPublisherFactory(Configuration conf) {
this.conf = conf;
this.report2publisher = new HashMap<>();
report2publisher.put(NodeReportProto.class, NodeReportPublisher.class);
report2publisher.put(ContainerReportsProto.class,
ContainerReportPublisher.class);
}
/**
* Returns the ReportPublisher for the corresponding report.
*
* @param report report
*
* @return report publisher
*/
public ReportPublisher getPublisherFor(
Class<? extends GeneratedMessage> report) {
Class<? extends ReportPublisher> publisherClass =
report2publisher.get(report);
if (publisherClass == null) {
throw new RuntimeException("No publisher found for report " + report);
}
return ReflectionUtils.newInstance(publisherClass, conf);
}
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.report;
/**
* Datanode Reports: As part of heartbeat, datanode has to share its current
* state with SCM. The state of datanode is split into multiple reports which
* are sent along with heartbeat in a configured frequency.
*
* This package contains code which is responsible for sending reports from
* datanode to SCM.
*
* ReportPublisherFactory: Given a report this constructs corresponding
* {@link org.apache.hadoop.ozone.container.common.report.ReportPublisher}.
*
* ReportManager: Manages and initializes all the available ReportPublishers.
*
* ReportPublisher: Abstract class responsible for scheduling the reports
* based on the configured interval. All the ReportPublishers should extend
* {@link org.apache.hadoop.ozone.container.common.report.ReportPublisher}
*
* How to add new report:
*
* 1. Create a new ReportPublisher class which extends
* {@link org.apache.hadoop.ozone.container.common.report.ReportPublisher}.
*
* 2. Add a mapping Report to ReportPublisher entry in ReportPublisherFactory.
*
* 3. In DatanodeStateMachine add the report to ReportManager instance.
*
*
*
* Datanode Reports State Diagram:
*
* DatanodeStateMachine ReportManager ReportPublisher SCM
* | | | |
* | | | |
* | construct | | |
* |----------------->| | |
* | | | |
* | init | | |
* |----------------->| | |
* | | init | |
* | |------------->| |
* | | | |
* +--------+------------------+--------------+--------------------+------+
* |loop | | | | |
* | | | publish | | |
* | |<-----------------+--------------| | |
* | | | report | | |
* | | | | | |
* | | | | | |
* | | heartbeat(rpc) | | | |
* | |------------------+--------------+------------------->| |
* | | | | | |
* | | | | | |
* +--------+------------------+--------------+--------------------+------+
* | | | |
* | | | |
* | | | |
* | shutdown | | |
* |----------------->| | |
* | | | |
* | | | |
* - - - -
*/

View File

@ -21,7 +21,13 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler; import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.ozone.container.common.report.ReportManager;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.CloseContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.CommandDispatcher; .CommandDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
@ -56,6 +62,7 @@ public class DatanodeStateMachine implements Closeable {
private final OzoneContainer container; private final OzoneContainer container;
private DatanodeDetails datanodeDetails; private DatanodeDetails datanodeDetails;
private final CommandDispatcher commandDispatcher; private final CommandDispatcher commandDispatcher;
private final ReportManager reportManager;
private long commandsHandled; private long commandsHandled;
private AtomicLong nextHB; private AtomicLong nextHB;
private Thread stateMachineThread = null; private Thread stateMachineThread = null;
@ -92,6 +99,12 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
.setContainer(container) .setContainer(container)
.setContext(context) .setContext(context)
.build(); .build();
reportManager = ReportManager.newBuilder(conf)
.setStateContext(context)
.addPublisherFor(NodeReportProto.class)
.addPublisherFor(ContainerReportsProto.class)
.build();
} }
/** /**
@ -125,12 +138,12 @@ private void start() throws IOException {
long now = 0; long now = 0;
container.start(); container.start();
reportManager.init();
initCommandHandlerThread(conf); initCommandHandlerThread(conf);
while (context.getState() != DatanodeStates.SHUTDOWN) { while (context.getState() != DatanodeStates.SHUTDOWN) {
try { try {
LOG.debug("Executing cycle Number : {}", context.getExecutionCount()); LOG.debug("Executing cycle Number : {}", context.getExecutionCount());
nextHB.set(Time.monotonicNow() + heartbeatFrequency); nextHB.set(Time.monotonicNow() + heartbeatFrequency);
context.setNodeReport(container.getNodeReport());
context.execute(executorService, heartbeatFrequency, context.execute(executorService, heartbeatFrequency,
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
now = Time.monotonicNow(); now = Time.monotonicNow();
@ -307,6 +320,7 @@ public void join() throws InterruptedException {
public synchronized void stopDaemon() { public synchronized void stopDaemon() {
try { try {
context.setState(DatanodeStates.SHUTDOWN); context.setState(DatanodeStates.SHUTDOWN);
reportManager.shutdown();
this.close(); this.close();
LOG.info("Ozone container server stopped."); LOG.info("Ozone container server stopped.");
} catch (IOException e) { } catch (IOException e) {

View File

@ -16,9 +16,8 @@
*/ */
package org.apache.hadoop.ozone.container.common.statemachine; package org.apache.hadoop.ozone.container.common.statemachine;
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.ozone.container.common.states.DatanodeState; import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import org.apache.hadoop.ozone.container.common.states.datanode import org.apache.hadoop.ozone.container.common.states.datanode
.InitDatanodeState; .InitDatanodeState;
@ -28,7 +27,9 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -51,8 +52,8 @@ public class StateContext {
private final DatanodeStateMachine parent; private final DatanodeStateMachine parent;
private final AtomicLong stateExecutionCount; private final AtomicLong stateExecutionCount;
private final Configuration conf; private final Configuration conf;
private final Queue<GeneratedMessage> reports;
private DatanodeStateMachine.DatanodeStates state; private DatanodeStateMachine.DatanodeStates state;
private NodeReportProto dnReport;
/** /**
* Constructs a StateContext. * Constructs a StateContext.
@ -67,9 +68,9 @@ public StateContext(Configuration conf, DatanodeStateMachine.DatanodeStates
this.state = state; this.state = state;
this.parent = parent; this.parent = parent;
commandQueue = new LinkedList<>(); commandQueue = new LinkedList<>();
reports = new LinkedList<>();
lock = new ReentrantLock(); lock = new ReentrantLock();
stateExecutionCount = new AtomicLong(0); stateExecutionCount = new AtomicLong(0);
dnReport = NodeReportProto.getDefaultInstance();
} }
/** /**
@ -141,19 +142,53 @@ public void setState(DatanodeStateMachine.DatanodeStates state) {
} }
/** /**
* Returns the node report of the datanode state context. * Adds the report to report queue.
* @return the node report. *
* @param report report to be added
*/ */
public NodeReportProto getNodeReport() { public void addReport(GeneratedMessage report) {
return dnReport; synchronized (reports) {
reports.add(report);
}
} }
/** /**
* Sets the storage location report of the datanode state context. * Returns the next report, or null if the report queue is empty.
* @param nodeReport node report *
* @return report
*/ */
public void setNodeReport(NodeReportProto nodeReport) { public GeneratedMessage getNextReport() {
this.dnReport = nodeReport; synchronized (reports) {
return reports.poll();
}
}
/**
* Returns all the available reports from the report queue, or empty list if
* the queue is empty.
*
* @return List<reports>
*/
public List<GeneratedMessage> getAllAvailableReports() {
return getReports(Integer.MAX_VALUE);
}
/**
* Returns available reports from the report queue with a max limit on
* list size, or empty list if the queue is empty.
*
* @return List<reports>
*/
public List<GeneratedMessage> getReports(int maxLimit) {
List<GeneratedMessage> results = new ArrayList<>();
synchronized (reports) {
GeneratedMessage report = reports.poll();
while(results.size() < maxLimit && report != null) {
results.add(report);
report = reports.poll();
}
}
return results;
} }
/** /**

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.container.common.states.endpoint; package org.apache.hadoop.ozone.container.common.states.endpoint;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
@ -99,13 +100,13 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
try { try {
Preconditions.checkState(this.datanodeDetailsProto != null); Preconditions.checkState(this.datanodeDetailsProto != null);
SCMHeartbeatRequestProto request = SCMHeartbeatRequestProto.newBuilder() SCMHeartbeatRequestProto.Builder requestBuilder =
.setDatanodeDetails(datanodeDetailsProto) SCMHeartbeatRequestProto.newBuilder()
.setNodeReport(context.getNodeReport()) .setDatanodeDetails(datanodeDetailsProto);
.build(); addReports(requestBuilder);
SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint() SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
.sendHeartbeat(request); .sendHeartbeat(requestBuilder.build());
processResponse(reponse, datanodeDetailsProto); processResponse(reponse, datanodeDetailsProto);
rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now()); rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
rpcEndpoint.zeroMissedCount(); rpcEndpoint.zeroMissedCount();
@ -117,6 +118,19 @@ public EndpointStateMachine.EndPointStates call() throws Exception {
return rpcEndpoint.getState(); return rpcEndpoint.getState();
} }
/**
* Adds all the available reports to heartbeat.
*
* @param requestBuilder builder to which the report has to be added.
*/
private void addReports(SCMHeartbeatRequestProto.Builder requestBuilder) {
for (GeneratedMessage report : context.getAllAvailableReports()) {
requestBuilder.setField(
SCMHeartbeatRequestProto.getDescriptor().findFieldByName(
report.getDescriptorForType().getName()), report);
}
}
/** /**
* Returns a builder class for HeartbeatEndpointTask task. * Returns a builder class for HeartbeatEndpointTask task.
* @return Builder. * @return Builder.

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.report;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.concurrent.ScheduledExecutorService;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* Test cases to test {@link ReportManager}.
*/
public class TestReportManager {
@Test
public void testReportManagerInit() {
Configuration conf = new OzoneConfiguration();
StateContext dummyContext = Mockito.mock(StateContext.class);
ReportPublisher dummyPublisher = Mockito.mock(ReportPublisher.class);
ReportManager.Builder builder = ReportManager.newBuilder(conf);
builder.setStateContext(dummyContext);
builder.addPublisher(dummyPublisher);
ReportManager reportManager = builder.build();
reportManager.init();
verify(dummyPublisher, times(1)).init(eq(dummyContext),
any(ScheduledExecutorService.class));
}
}

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.report;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* Test cases to test {@link ReportPublisher}.
*/
public class TestReportPublisher {
/**
* Dummy report publisher for testing.
*/
private class DummyReportPublisher extends ReportPublisher {
private final long frequency;
private int getReportCount = 0;
DummyReportPublisher(long frequency) {
this.frequency = frequency;
}
@Override
protected long getReportFrequency() {
return frequency;
}
@Override
protected GeneratedMessage getReport() {
getReportCount++;
return null;
}
}
@Test
public void testReportPublisherInit() {
ReportPublisher publisher = new DummyReportPublisher(0);
StateContext dummyContext = Mockito.mock(StateContext.class);
ScheduledExecutorService dummyExecutorService = Mockito.mock(
ScheduledExecutorService.class);
publisher.init(dummyContext, dummyExecutorService);
verify(dummyExecutorService, times(1)).schedule(publisher,
0, TimeUnit.MILLISECONDS);
}
@Test
public void testScheduledReport() throws InterruptedException {
ReportPublisher publisher = new DummyReportPublisher(100);
StateContext dummyContext = Mockito.mock(StateContext.class);
ScheduledExecutorService executorService = HadoopExecutors
.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Unit test ReportManager Thread - %d").build());
publisher.init(dummyContext, executorService);
Thread.sleep(150);
Assert.assertEquals(1, ((DummyReportPublisher)publisher).getReportCount);
Thread.sleep(150);
Assert.assertEquals(2, ((DummyReportPublisher)publisher).getReportCount);
executorService.shutdown();
}
@Test
public void testPublishReport() throws InterruptedException {
ReportPublisher publisher = new DummyReportPublisher(100);
StateContext dummyContext = Mockito.mock(StateContext.class);
ScheduledExecutorService executorService = HadoopExecutors
.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Unit test ReportManager Thread - %d").build());
publisher.init(dummyContext, executorService);
Thread.sleep(150);
executorService.shutdown();
Assert.assertEquals(1, ((DummyReportPublisher)publisher).getReportCount);
verify(dummyContext, times(1)).addReport(null);
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.report;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
/**
* Test cases to test ReportPublisherFactory.
*/
public class TestReportPublisherFactory {
@Rule
public ExpectedException exception = ExpectedException.none();
@Test
public void testGetContainerReportPublisher() {
Configuration conf = new OzoneConfiguration();
ReportPublisherFactory factory = new ReportPublisherFactory(conf);
ReportPublisher publisher = factory
.getPublisherFor(ContainerReportsProto.class);
Assert.assertEquals(ContainerReportPublisher.class, publisher.getClass());
Assert.assertEquals(conf, publisher.getConf());
}
@Test
public void testGetNodeReportPublisher() {
Configuration conf = new OzoneConfiguration();
ReportPublisherFactory factory = new ReportPublisherFactory(conf);
ReportPublisher publisher = factory
.getPublisherFor(NodeReportProto.class);
Assert.assertEquals(NodeReportPublisher.class, publisher.getClass());
Assert.assertEquals(conf, publisher.getConf());
}
@Test
public void testInvalidReportPublisher() {
Configuration conf = new OzoneConfiguration();
ReportPublisherFactory factory = new ReportPublisherFactory(conf);
exception.expect(RuntimeException.class);
exception.expectMessage("No publisher found for report");
factory.getPublisherFor(HddsProtos.DatanodeDetailsProto.class);
}
}

View File

@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.common.report;
/**
* This package has test cases for all the report publishers which generates
* reports that are sent to SCM via heartbeat.
*/