HDDS-193. Make Datanode heartbeat dispatcher in SCM event based.

Contributed by Elek, Marton.
This commit is contained in:
Anu Engineer 2018-06-27 14:18:25 -07:00
parent 18932717c4
commit 8752a48564
16 changed files with 254 additions and 1072 deletions

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.TypedEvent;
import com.google.protobuf.GeneratedMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is responsible for dispatching heartbeat from datanode to
* appropriate EventHandler at SCM.
*/
public final class SCMDatanodeHeartbeatDispatcher {
private static final Logger LOG =
LoggerFactory.getLogger(SCMDatanodeHeartbeatDispatcher.class);
private EventPublisher eventPublisher;
public static final TypedEvent<NodeReportFromDatanode> NODE_REPORT =
new TypedEvent<>(NodeReportFromDatanode.class);
public static final TypedEvent<ContainerReportFromDatanode> CONTAINER_REPORT =
new TypedEvent<ContainerReportFromDatanode>(ContainerReportFromDatanode.class);
public SCMDatanodeHeartbeatDispatcher(EventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
}
/**
* Dispatches heartbeat to registered event handlers.
*
* @param heartbeat heartbeat to be dispatched.
*/
public void dispatch(SCMHeartbeatRequestProto heartbeat) {
DatanodeDetails datanodeDetails =
DatanodeDetails.getFromProtoBuf(heartbeat.getDatanodeDetails());
if (heartbeat.hasNodeReport()) {
eventPublisher.fireEvent(NODE_REPORT,
new NodeReportFromDatanode(datanodeDetails,
heartbeat.getNodeReport()));
}
if (heartbeat.hasContainerReport()) {
eventPublisher.fireEvent(CONTAINER_REPORT,
new ContainerReportFromDatanode(datanodeDetails,
heartbeat.getContainerReport()));
}
}
/**
* Wrapper class for events with the datanode origin.
*/
public static class ReportFromDatanode<T extends GeneratedMessage> {
private final DatanodeDetails datanodeDetails;
private final T report;
public ReportFromDatanode(DatanodeDetails datanodeDetails, T report) {
this.datanodeDetails = datanodeDetails;
this.report = report;
}
public DatanodeDetails getDatanodeDetails() {
return datanodeDetails;
}
public T getReport() {
return report;
}
}
/**
* Node report event payload with origin.
*/
public static class NodeReportFromDatanode
extends ReportFromDatanode<NodeReportProto> {
public NodeReportFromDatanode(DatanodeDetails datanodeDetails,
NodeReportProto report) {
super(datanodeDetails, report);
}
}
/**
* Container report event payload with origin.
*/
public static class ContainerReportFromDatanode
extends ReportFromDatanode<ContainerReportsProto> {
public ContainerReportFromDatanode(DatanodeDetails datanodeDetails,
ContainerReportsProto report) {
super(datanodeDetails, report);
}
}
}

View File

@ -73,7 +73,7 @@
import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.server.report.SCMDatanodeHeartbeatDispatcher; import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
@ -122,14 +122,19 @@ public class SCMDatanodeProtocolServer implements
private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher; private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher;
public SCMDatanodeProtocolServer(final OzoneConfiguration conf, public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
StorageContainerManager scm) throws IOException { StorageContainerManager scm, EventPublisher eventPublisher)
throws IOException {
Preconditions.checkNotNull(scm, "SCM cannot be null"); Preconditions.checkNotNull(scm, "SCM cannot be null");
Preconditions.checkNotNull(eventPublisher, "EventPublisher cannot be null");
this.scm = scm; this.scm = scm;
final int handlerCount = final int handlerCount =
conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY, conf.getInt(OZONE_SCM_HANDLER_COUNT_KEY,
OZONE_SCM_HANDLER_COUNT_DEFAULT); OZONE_SCM_HANDLER_COUNT_DEFAULT);
heartbeatDispatcher = new SCMDatanodeHeartbeatDispatcher(eventPublisher);
RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class, RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
ProtobufRpcEngine.class); ProtobufRpcEngine.class);
BlockingService dnProtoPbService = BlockingService dnProtoPbService =
@ -155,10 +160,6 @@ public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
conf, OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, conf, OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr,
datanodeRpcServer); datanodeRpcServer);
heartbeatDispatcher = SCMDatanodeHeartbeatDispatcher.newBuilder(conf, scm)
.addHandlerFor(NodeReportProto.class)
.addHandlerFor(ContainerReportsProto.class)
.build();
} }
public void start() { public void start() {
@ -319,7 +320,6 @@ public void stop() {
try { try {
LOG.info("Stopping the RPC server for DataNodes"); LOG.info("Stopping the RPC server for DataNodes");
datanodeRpcServer.stop(); datanodeRpcServer.stop();
heartbeatDispatcher.shutdown();
} catch (Exception ex) { } catch (Exception ex) {
LOG.error(" datanodeRpcServer stop failed.", ex); LOG.error(" datanodeRpcServer stop failed.", ex);
} }

View File

@ -52,7 +52,6 @@
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.Storage.StorageState; import org.apache.hadoop.ozone.common.Storage.StorageState;
import org.apache.hadoop.ozone.common.StorageInfo; import org.apache.hadoop.ozone.common.StorageInfo;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -182,7 +181,8 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
scmAdminUsernames.add(scmUsername); scmAdminUsernames.add(scmUsername);
} }
datanodeProtocolServer = new SCMDatanodeProtocolServer(conf, this); datanodeProtocolServer = new SCMDatanodeProtocolServer(conf, this,
eventQueue);
blockProtocolServer = new SCMBlockProtocolServer(conf, this); blockProtocolServer = new SCMBlockProtocolServer(conf, this);
clientProtocolServer = new SCMClientProtocolServer(conf, this); clientProtocolServer = new SCMClientProtocolServer(conf, this);
httpServer = new StorageContainerManagerHttpServer(conf); httpServer = new StorageContainerManagerHttpServer(conf);

View File

@ -1,76 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Handler for Datanode Container Report.
*/
public class SCMDatanodeContainerReportHandler extends
SCMDatanodeReportHandler<ContainerReportsProto> {
private static final Logger LOG = LoggerFactory.getLogger(
SCMDatanodeContainerReportHandler.class);
@Override
public void processReport(DatanodeDetails datanodeDetails,
ContainerReportsProto report) throws IOException {
LOG.trace("Processing container report from {}.", datanodeDetails);
updateContainerReportMetrics(datanodeDetails, report);
getSCM().getScmContainerManager()
.processContainerReports(datanodeDetails, report);
}
/**
* Updates container report metrics in SCM.
*
* @param datanodeDetails Datanode Information
* @param reports Container Reports
*/
private void updateContainerReportMetrics(DatanodeDetails datanodeDetails,
ContainerReportsProto reports) {
ContainerStat newStat = new ContainerStat();
for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports
.getReportsList()) {
newStat.add(new ContainerStat(info.getSize(), info.getUsed(),
info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(),
info.getReadCount(), info.getWriteCount()));
}
// update container metrics
StorageContainerManager.getMetrics().setLastContainerStat(newStat);
// Update container stat entry, this will trigger a removal operation if it
// exists in cache.
String datanodeUuid = datanodeDetails.getUuidString();
getSCM().getContainerReportCache().put(datanodeUuid, newStat);
// update global view container metrics
StorageContainerManager.getMetrics().incrContainerStat(newStat);
}
}

View File

@ -1,189 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
* This class is responsible for dispatching heartbeat from datanode to
* appropriate ReportHandlers at SCM.
* Only one handler per report is supported now, it's very easy to support
* multiple handlers for a report.
*/
public final class SCMDatanodeHeartbeatDispatcher {
private static final Logger LOG = LoggerFactory.getLogger(
SCMDatanodeHeartbeatDispatcher.class);
/**
* This stores Report to Handler mapping.
*/
private final Map<Class<? extends GeneratedMessage>,
SCMDatanodeReportHandler<? extends GeneratedMessage>> handlers;
/**
* Executor service which will be used for processing reports.
*/
private final ExecutorService executorService;
/**
* Constructs SCMDatanodeHeartbeatDispatcher instance with the given
* handlers.
*
* @param handlers report to report handler mapping
*/
private SCMDatanodeHeartbeatDispatcher(Map<Class<? extends GeneratedMessage>,
SCMDatanodeReportHandler<? extends GeneratedMessage>> handlers) {
this.handlers = handlers;
this.executorService = HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("SCMDatanode Heartbeat Dispatcher Thread - %d")
.build());
}
/**
* Dispatches heartbeat to registered handlers.
*
* @param heartbeat heartbeat to be dispatched.
*/
public void dispatch(SCMHeartbeatRequestProto heartbeat) {
DatanodeDetails datanodeDetails = DatanodeDetails
.getFromProtoBuf(heartbeat.getDatanodeDetails());
if (heartbeat.hasNodeReport()) {
processReport(datanodeDetails, heartbeat.getNodeReport());
}
if (heartbeat.hasContainerReport()) {
processReport(datanodeDetails, heartbeat.getContainerReport());
}
}
/**
* Invokes appropriate ReportHandler and submits the task to executor
* service for processing.
*
* @param datanodeDetails Datanode Information
* @param report Report to be processed
*/
@SuppressWarnings("unchecked")
private void processReport(DatanodeDetails datanodeDetails,
GeneratedMessage report) {
executorService.submit(() -> {
try {
SCMDatanodeReportHandler handler = handlers.get(report.getClass());
handler.processReport(datanodeDetails, report);
} catch (IOException ex) {
LOG.error("Exception wile processing report {}, from {}",
report.getClass(), datanodeDetails, ex);
}
});
}
/**
* Shuts down SCMDatanodeHeartbeatDispatcher.
*/
public void shutdown() {
executorService.shutdown();
}
/**
* Returns a new Builder to construct {@link SCMDatanodeHeartbeatDispatcher}.
*
* @param conf Configuration to be used by SCMDatanodeHeartbeatDispatcher
* @param scm {@link StorageContainerManager} instance to be used by report
* handlers
*
* @return {@link SCMDatanodeHeartbeatDispatcher.Builder} instance
*/
public static Builder newBuilder(Configuration conf,
StorageContainerManager scm) {
return new Builder(conf, scm);
}
/**
* Builder for SCMDatanodeHeartbeatDispatcher.
*/
public static class Builder {
private final SCMDatanodeReportHandlerFactory reportHandlerFactory;
private final Map<Class<? extends GeneratedMessage>,
SCMDatanodeReportHandler<? extends GeneratedMessage>> report2handler;
/**
* Constructs SCMDatanodeHeartbeatDispatcher.Builder instance.
*
* @param conf Configuration object to be used.
* @param scm StorageContainerManager instance to be used for report
* handler initialization.
*/
private Builder(Configuration conf, StorageContainerManager scm) {
this.report2handler = new HashMap<>();
this.reportHandlerFactory =
new SCMDatanodeReportHandlerFactory(conf, scm);
}
/**
* Adds new report handler for the given report.
*
* @param report Report for which handler has to be added
*
* @return Builder
*/
public Builder addHandlerFor(Class<? extends GeneratedMessage> report) {
report2handler.put(report, reportHandlerFactory.getHandlerFor(report));
return this;
}
/**
* Associates the given report handler for the given report.
*
* @param report Report to be associated with
* @param handler Handler to be used for the report
*
* @return Builder
*/
public Builder addHandler(Class<? extends GeneratedMessage> report,
SCMDatanodeReportHandler<? extends GeneratedMessage> handler) {
report2handler.put(report, handler);
return this;
}
/**
* Builds and returns {@link SCMDatanodeHeartbeatDispatcher} instance.
*
* @return SCMDatanodeHeartbeatDispatcher
*/
public SCMDatanodeHeartbeatDispatcher build() {
return new SCMDatanodeHeartbeatDispatcher(report2handler);
}
}
}

View File

@ -1,43 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Handles Datanode Node Report.
*/
public class SCMDatanodeNodeReportHandler extends
SCMDatanodeReportHandler<NodeReportProto> {
private static final Logger LOG = LoggerFactory.getLogger(
SCMDatanodeNodeReportHandler.class);
@Override
public void processReport(DatanodeDetails datanodeDetails,
NodeReportProto report) throws IOException {
LOG.debug("Processing node report from {}.", datanodeDetails);
//TODO: add logic to process node report.
}
}

View File

@ -1,83 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import java.io.IOException;
/**
* Datanode Report handlers should implement this interface in order to get
* call back whenever the report is received from datanode.
*
* @param <T> Type of report the handler is interested in.
*/
public abstract class SCMDatanodeReportHandler<T extends GeneratedMessage>
implements Configurable {
private Configuration config;
private StorageContainerManager scm;
/**
* Initializes SCMDatanodeReportHandler and associates it with the given
* StorageContainerManager instance.
*
* @param storageContainerManager StorageContainerManager instance to be
* associated with.
*/
public void init(StorageContainerManager storageContainerManager) {
this.scm = storageContainerManager;
}
/**
* Returns the associated StorageContainerManager instance. This will be
* used by the ReportHandler implementations.
*
* @return {@link StorageContainerManager}
*/
protected StorageContainerManager getSCM() {
return scm;
}
@Override
public void setConf(Configuration conf) {
this.config = conf;
}
@Override
public Configuration getConf() {
return config;
}
/**
* Processes the report received from datanode. Each ReportHandler
* implementation is responsible for providing the logic to process the
* report it's interested in.
*
* @param datanodeDetails Datanode Information
* @param report Report to be processed
*
* @throws IOException In case of any exception
*/
abstract void processReport(DatanodeDetails datanodeDetails, T report)
throws IOException;
}

View File

@ -1,82 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import com.google.protobuf.GeneratedMessage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.util.ReflectionUtils;
import java.util.HashMap;
import java.util.Map;
/**
* Factory class to construct {@link SCMDatanodeReportHandler} given a report.
*/
public class SCMDatanodeReportHandlerFactory {
private final Configuration conf;
private final StorageContainerManager scm;
private final Map<Class<? extends GeneratedMessage>,
Class<? extends SCMDatanodeReportHandler<? extends GeneratedMessage>>>
report2handler;
/**
* Constructs {@link SCMDatanodeReportHandler} instance.
*
* @param conf Configuration to be passed to the
* {@link SCMDatanodeReportHandler}
*/
public SCMDatanodeReportHandlerFactory(Configuration conf,
StorageContainerManager scm) {
this.conf = conf;
this.scm = scm;
this.report2handler = new HashMap<>();
report2handler.put(NodeReportProto.class,
SCMDatanodeNodeReportHandler.class);
report2handler.put(ContainerReportsProto.class,
SCMDatanodeContainerReportHandler.class);
}
/**
* Returns the SCMDatanodeReportHandler for the corresponding report.
*
* @param report report
*
* @return report handler
*/
public SCMDatanodeReportHandler<? extends GeneratedMessage> getHandlerFor(
Class<? extends GeneratedMessage> report) {
Class<? extends SCMDatanodeReportHandler<? extends GeneratedMessage>>
handlerClass = report2handler.get(report);
if (handlerClass == null) {
throw new RuntimeException("No handler found for report " + report);
}
SCMDatanodeReportHandler<? extends GeneratedMessage> instance =
ReflectionUtils.newInstance(handlerClass, conf);
instance.init(scm);
return instance;
}
}

View File

@ -1,57 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
/**
* Handling of all the datanode reports in SCM which are received through
* heartbeat is done here.
*
* SCM Datanode Report Processing State Diagram:
*
* SCMDatanode SCMDatanodeHeartbeat SCMDatanodeReport
* ProtocolServer Dispatcher Handler
* | | |
* | | |
* | construct | |
* |----------------------->| |
* | | |
* | | register |
* | |<-----------------------|
* | | |
* +------------+------------------------+------------------------+--------+
* | loop | | | |
* | | | | |
* | | | | |
* | heartbeat | | | |
* - +----------->| | | |
* | from | heartbeat | | |
* | Datanode |----------------------->| | |
* | | | report | |
* | | |----------------------->| |
* | | | | |
* | DN | | | |
* <-+------------| | | |
* | commands | | | |
* | | | | |
* +------------+------------------------+------------------------+--------+
* | | |
* | | |
* | shutdown | |
* |----------------------->| |
* | | |
* | | |
* - - -
*/

View File

@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.NodeReportFromDatanode;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.junit.Assert;
import org.junit.Test;
/**
* This class tests the behavior of SCMDatanodeHeartbeatDispatcher.
*/
public class TestSCMDatanodeHeartbeatDispatcher {
@Test
public void testNodeReportDispatcher() throws IOException {
Configuration conf = new OzoneConfiguration();
AtomicInteger eventReceived = new AtomicInteger();
NodeReportProto nodeReport = NodeReportProto.getDefaultInstance();
SCMDatanodeHeartbeatDispatcher dispatcher =
new SCMDatanodeHeartbeatDispatcher(new EventPublisher() {
@Override
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
EVENT_TYPE event, PAYLOAD payload) {
Assert.assertEquals(event,
SCMDatanodeHeartbeatDispatcher.NODE_REPORT);
eventReceived.incrementAndGet();
Assert.assertEquals(nodeReport, ((NodeReportFromDatanode)payload).getReport());
}
});
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
SCMHeartbeatRequestProto heartbeat =
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
.setNodeReport(nodeReport)
.build();
dispatcher.dispatch(heartbeat);
Assert.assertEquals(1, eventReceived.get());
}
@Test
public void testContainerReportDispatcher() throws IOException {
Configuration conf = new OzoneConfiguration();
AtomicInteger eventReceived = new AtomicInteger();
ContainerReportsProto containerReport =
ContainerReportsProto.getDefaultInstance();
SCMDatanodeHeartbeatDispatcher dispatcher =
new SCMDatanodeHeartbeatDispatcher(new EventPublisher() {
@Override
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
EVENT_TYPE event, PAYLOAD payload) {
Assert.assertEquals(event,
SCMDatanodeHeartbeatDispatcher.CONTAINER_REPORT);
Assert.assertEquals(containerReport, ((ContainerReportFromDatanode)payload).getReport());
eventReceived.incrementAndGet();
}
});
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
SCMHeartbeatRequestProto heartbeat =
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
.setContainerReport(containerReport)
.build();
dispatcher.dispatch(heartbeat);
Assert.assertEquals(1, eventReceived.get());
}
}

View File

@ -1,34 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import org.junit.Assert;
import org.junit.Test;
/**
* Test cases to verify SCMDatanodeContainerReportHandler's behavior.
*/
public class TestSCMDatanodeContainerReportHandler {
//TODO: add test cases to verify SCMDatanodeContainerReportHandler.
@Test
public void dummyTest() {
Assert.assertTrue(true);
}
}

View File

@ -1,138 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* This class tests the behavior of SCMDatanodeHeartbeatDispatcher.
*/
public class TestSCMDatanodeHeartbeatDispatcher {
@Test
public void testSCMDatanodeHeartbeatDispatcherBuilder() {
Configuration conf = new OzoneConfiguration();
SCMDatanodeHeartbeatDispatcher dispatcher =
SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null)
.addHandlerFor(NodeReportProto.class)
.addHandlerFor(ContainerReportsProto.class)
.build();
Assert.assertNotNull(dispatcher);
}
@Test
public void testNodeReportDispatcher() throws IOException {
Configuration conf = new OzoneConfiguration();
SCMDatanodeNodeReportHandler nodeReportHandler =
Mockito.mock(SCMDatanodeNodeReportHandler.class);
SCMDatanodeHeartbeatDispatcher dispatcher =
SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null)
.addHandler(NodeReportProto.class, nodeReportHandler)
.build();
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
NodeReportProto nodeReport = NodeReportProto.getDefaultInstance();
SCMHeartbeatRequestProto heartbeat =
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
.setNodeReport(nodeReport)
.build();
dispatcher.dispatch(heartbeat);
verify(nodeReportHandler,
times(1))
.processReport(any(DatanodeDetails.class), eq(nodeReport));
}
@Test
public void testContainerReportDispatcher() throws IOException {
Configuration conf = new OzoneConfiguration();
SCMDatanodeContainerReportHandler containerReportHandler =
Mockito.mock(SCMDatanodeContainerReportHandler.class);
SCMDatanodeHeartbeatDispatcher dispatcher =
SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null)
.addHandler(ContainerReportsProto.class, containerReportHandler)
.build();
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
ContainerReportsProto containerReport =
ContainerReportsProto.getDefaultInstance();
SCMHeartbeatRequestProto heartbeat =
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
.setContainerReport(containerReport)
.build();
dispatcher.dispatch(heartbeat);
verify(containerReportHandler,
times(1))
.processReport(any(DatanodeDetails.class),
any(ContainerReportsProto.class));
}
@Test
public void testNodeAndContainerReportDispatcher() throws IOException {
Configuration conf = new OzoneConfiguration();
SCMDatanodeNodeReportHandler nodeReportHandler =
Mockito.mock(SCMDatanodeNodeReportHandler.class);
SCMDatanodeContainerReportHandler containerReportHandler =
Mockito.mock(SCMDatanodeContainerReportHandler.class);
SCMDatanodeHeartbeatDispatcher dispatcher =
SCMDatanodeHeartbeatDispatcher.newBuilder(conf, null)
.addHandler(NodeReportProto.class, nodeReportHandler)
.addHandler(ContainerReportsProto.class, containerReportHandler)
.build();
DatanodeDetails datanodeDetails = TestUtils.getDatanodeDetails();
NodeReportProto nodeReport = NodeReportProto.getDefaultInstance();
ContainerReportsProto containerReport =
ContainerReportsProto.getDefaultInstance();
SCMHeartbeatRequestProto heartbeat =
SCMHeartbeatRequestProto.newBuilder()
.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
.setNodeReport(nodeReport)
.setContainerReport(containerReport)
.build();
dispatcher.dispatch(heartbeat);
verify(nodeReportHandler,
times(1))
.processReport(any(DatanodeDetails.class), any(NodeReportProto.class));
verify(containerReportHandler,
times(1))
.processReport(any(DatanodeDetails.class),
any(ContainerReportsProto.class));
}
}

View File

@ -1,36 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import org.junit.Assert;
import org.junit.Test;
/**
* Test cases to verify TestSCMDatanodeNodeReportHandler's behavior.
*/
public class TestSCMDatanodeNodeReportHandler {
//TODO: add test cases to verify SCMDatanodeNodeReportHandler.
@Test
public void dummyTest() {
Assert.assertTrue(true);
}
}

View File

@ -1,51 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.junit.Assert;
import org.junit.Test;
/**
* Test cases to verify the functionality of SCMDatanodeReportHandlerFactory.
*/
public class TestSCMDatanodeReportHandlerFactory {
@Test
public void testNodeReportHandlerConstruction() {
Configuration conf = new OzoneConfiguration();
SCMDatanodeReportHandlerFactory factory =
new SCMDatanodeReportHandlerFactory(conf, null);
Assert.assertTrue(factory.getHandlerFor(NodeReportProto.class)
instanceof SCMDatanodeNodeReportHandler);
}
@Test
public void testContainerReporttHandlerConstruction() {
Configuration conf = new OzoneConfiguration();
SCMDatanodeReportHandlerFactory factory =
new SCMDatanodeReportHandlerFactory(conf, null);
Assert.assertTrue(factory.getHandlerFor(ContainerReportsProto.class)
instanceof SCMDatanodeContainerReportHandler);
}
}

View File

@ -1,21 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.server.report;
/**
* Contains test-cases to test Datanode report handlers in SCM.
*/

View File

@ -1,253 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.scm;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getLongGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.server.report
.SCMDatanodeContainerReportHandler;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ContainerReport;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
/**
* This class tests the metrics of Storage Container Manager.
*/
public class TestSCMMetrics {
/**
* Set the timeout for each test.
*/
@Rule
public Timeout testTimeout = new Timeout(90000);
private static MiniOzoneCluster cluster = null;
@Test
public void testContainerMetrics() throws Exception {
int nodeCount = 2;
int numReport = 2;
long size = OzoneConsts.GB * 5;
long used = OzoneConsts.GB * 2;
long readBytes = OzoneConsts.GB * 1;
long writeBytes = OzoneConsts.GB * 2;
int keyCount = 1000;
int readCount = 100;
int writeCount = 50;
OzoneConfiguration conf = new OzoneConfiguration();
try {
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(nodeCount).build();
cluster.waitForClusterToBeReady();
ContainerStat stat = new ContainerStat(size, used, keyCount, readBytes,
writeBytes, readCount, writeCount);
StorageContainerManager scmManager = cluster.getStorageContainerManager();
DatanodeDetails fstDatanodeDetails = TestUtils.getDatanodeDetails();
ContainerReportsProto request = createContainerReport(numReport, stat);
String fstDatanodeUuid = fstDatanodeDetails.getUuidString();
SCMDatanodeContainerReportHandler containerReportHandler =
new SCMDatanodeContainerReportHandler();
containerReportHandler.setConf(conf);
containerReportHandler.init(scmManager);
containerReportHandler.processReport(
fstDatanodeDetails, request);
// verify container stat metrics
MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
assertEquals(size * numReport,
getLongGauge("LastContainerReportSize", scmMetrics));
assertEquals(used * numReport,
getLongGauge("LastContainerReportUsed", scmMetrics));
assertEquals(readBytes * numReport,
getLongGauge("LastContainerReportReadBytes", scmMetrics));
assertEquals(writeBytes * numReport,
getLongGauge("LastContainerReportWriteBytes", scmMetrics));
assertEquals(keyCount * numReport,
getLongGauge("LastContainerReportKeyCount", scmMetrics));
assertEquals(readCount * numReport,
getLongGauge("LastContainerReportReadCount", scmMetrics));
assertEquals(writeCount * numReport,
getLongGauge("LastContainerReportWriteCount", scmMetrics));
// add one new report
DatanodeDetails sndDatanodeDetails = TestUtils.getDatanodeDetails();
request = createContainerReport(1, stat);
String sndDatanodeUuid = sndDatanodeDetails.getUuidString();
containerReportHandler.processReport(
sndDatanodeDetails, request);
scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
assertEquals(size * (numReport + 1),
getLongCounter("ContainerReportSize", scmMetrics));
assertEquals(used * (numReport + 1),
getLongCounter("ContainerReportUsed", scmMetrics));
assertEquals(readBytes * (numReport + 1),
getLongCounter("ContainerReportReadBytes", scmMetrics));
assertEquals(writeBytes * (numReport + 1),
getLongCounter("ContainerReportWriteBytes", scmMetrics));
assertEquals(keyCount * (numReport + 1),
getLongCounter("ContainerReportKeyCount", scmMetrics));
assertEquals(readCount * (numReport + 1),
getLongCounter("ContainerReportReadCount", scmMetrics));
assertEquals(writeCount * (numReport + 1),
getLongCounter("ContainerReportWriteCount", scmMetrics));
// Re-send reports but with different value for validating
// the aggregation.
stat = new ContainerStat(100, 50, 3, 50, 60, 5, 6);
containerReportHandler.processReport(
fstDatanodeDetails, createContainerReport(1, stat));
stat = new ContainerStat(1, 1, 1, 1, 1, 1, 1);
containerReportHandler.processReport(
sndDatanodeDetails, createContainerReport(1, stat));
// the global container metrics value should be updated
scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
assertEquals(101, getLongCounter("ContainerReportSize", scmMetrics));
assertEquals(51, getLongCounter("ContainerReportUsed", scmMetrics));
assertEquals(51, getLongCounter("ContainerReportReadBytes", scmMetrics));
assertEquals(61, getLongCounter("ContainerReportWriteBytes", scmMetrics));
assertEquals(4, getLongCounter("ContainerReportKeyCount", scmMetrics));
assertEquals(6, getLongCounter("ContainerReportReadCount", scmMetrics));
assertEquals(7, getLongCounter("ContainerReportWriteCount", scmMetrics));
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testStaleNodeContainerReport() throws Exception {
int nodeCount = 2;
int numReport = 2;
long size = OzoneConsts.GB * 5;
long used = OzoneConsts.GB * 2;
long readBytes = OzoneConsts.GB * 1;
long writeBytes = OzoneConsts.GB * 2;
int keyCount = 1000;
int readCount = 100;
int writeCount = 50;
OzoneConfiguration conf = new OzoneConfiguration();
try {
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(nodeCount).build();
cluster.waitForClusterToBeReady();
ContainerStat stat = new ContainerStat(size, used, keyCount, readBytes,
writeBytes, readCount, writeCount);
StorageContainerManager scmManager = cluster.getStorageContainerManager();
DatanodeDetails datanodeDetails = cluster.getHddsDatanodes().get(0)
.getDatanodeDetails();
SCMDatanodeContainerReportHandler containerReportHandler =
new SCMDatanodeContainerReportHandler();
containerReportHandler.setConf(conf);
containerReportHandler.init(scmManager);
ContainerReportsProto request = createContainerReport(numReport, stat);
containerReportHandler.processReport(
datanodeDetails, request);
MetricsRecordBuilder scmMetrics = getMetrics(SCMMetrics.SOURCE_NAME);
assertEquals(size * numReport,
getLongCounter("ContainerReportSize", scmMetrics));
assertEquals(used * numReport,
getLongCounter("ContainerReportUsed", scmMetrics));
assertEquals(readBytes * numReport,
getLongCounter("ContainerReportReadBytes", scmMetrics));
assertEquals(writeBytes * numReport,
getLongCounter("ContainerReportWriteBytes", scmMetrics));
assertEquals(keyCount * numReport,
getLongCounter("ContainerReportKeyCount", scmMetrics));
assertEquals(readCount * numReport,
getLongCounter("ContainerReportReadCount", scmMetrics));
assertEquals(writeCount * numReport,
getLongCounter("ContainerReportWriteCount", scmMetrics));
// reset stale interval time to move node from healthy to stale
SCMNodeManager nodeManager = (SCMNodeManager) cluster
.getStorageContainerManager().getScmNodeManager();
nodeManager.setStaleNodeIntervalMs(100);
// verify the metrics when node becomes stale
GenericTestUtils.waitFor(() -> {
MetricsRecordBuilder metrics = getMetrics(SCMMetrics.SOURCE_NAME);
return 0 == getLongCounter("ContainerReportSize", metrics)
&& 0 == getLongCounter("ContainerReportUsed", metrics)
&& 0 == getLongCounter("ContainerReportReadBytes", metrics)
&& 0 == getLongCounter("ContainerReportWriteBytes", metrics)
&& 0 == getLongCounter("ContainerReportKeyCount", metrics)
&& 0 == getLongCounter("ContainerReportReadCount", metrics)
&& 0 == getLongCounter("ContainerReportWriteCount", metrics);
}, 1000, 60000);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private ContainerReportsProto createContainerReport(int numReport,
ContainerStat stat) {
StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder
reportsBuilder = StorageContainerDatanodeProtocolProtos
.ContainerReportsProto.newBuilder();
for (int i = 0; i < numReport; i++) {
ContainerReport report = new ContainerReport(
RandomUtils.nextLong(), DigestUtils.sha256Hex("Simulated"));
report.setSize(stat.getSize().get());
report.setBytesUsed(stat.getUsed().get());
report.setReadCount(stat.getReadCount().get());
report.setReadBytes(stat.getReadBytes().get());
report.setKeyCount(stat.getKeyCount().get());
report.setWriteCount(stat.getWriteCount().get());
report.setWriteBytes(stat.getWriteBytes().get());
reportsBuilder.addReports(report.getProtoBufMessage());
}
return reportsBuilder.build();
}
}