diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java index f93c54bbc1..b2b0df2a2a 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java @@ -147,7 +147,12 @@ public > void fireEvent( for (EventHandler handler : executorAndHandlers.getValue()) { queuedCount.incrementAndGet(); - + if (LOG.isDebugEnabled()) { + LOG.debug("Delivering event {} to executor/handler {}: {}", + event.getName(), + executorAndHandlers.getKey().getName(), + payload); + } executorAndHandlers.getKey() .onMessage(handler, payload, this); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 486162e3d3..b26eed2c75 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -18,30 +18,131 @@ package org.apache.hadoop.hdds.scm.container; +import java.io.IOException; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.replication + .ReplicationActivityStatus; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.scm.node.states.ReportResult; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .ContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Handles container reports from datanode. */ public class ContainerReportHandler implements EventHandler { - private final Mapping containerMapping; + private static final Logger LOG = + LoggerFactory.getLogger(ContainerReportHandler.class); + private final Node2ContainerMap node2ContainerMap; + private final Mapping containerMapping; + + private ContainerStateManager containerStateManager; + + private ReplicationActivityStatus replicationStatus; + + public ContainerReportHandler(Mapping containerMapping, - Node2ContainerMap node2ContainerMap) { + Node2ContainerMap node2ContainerMap, + ReplicationActivityStatus replicationActivityStatus) { + Preconditions.checkNotNull(containerMapping); + Preconditions.checkNotNull(node2ContainerMap); + Preconditions.checkNotNull(replicationActivityStatus); this.containerMapping = containerMapping; this.node2ContainerMap = node2ContainerMap; + this.containerStateManager = containerMapping.getStateManager(); + this.replicationStatus = replicationActivityStatus; } @Override public void onMessage(ContainerReportFromDatanode containerReportFromDatanode, EventPublisher publisher) { - // TODO: process container report. + + DatanodeDetails datanodeOrigin = + containerReportFromDatanode.getDatanodeDetails(); + + ContainerReportsProto containerReport = + containerReportFromDatanode.getReport(); + try { + + //update state in container db and trigger close container events + containerMapping.processContainerReports(datanodeOrigin, containerReport); + + Set containerIds = containerReport.getReportsList().stream() + .map(containerProto -> containerProto.getContainerID()) + .map(ContainerID::new) + .collect(Collectors.toSet()); + + ReportResult reportResult = node2ContainerMap + .processReport(datanodeOrigin.getUuid(), containerIds); + + //we have the report, so we can update the states for the next iteration. + node2ContainerMap + .setContainersForDatanode(datanodeOrigin.getUuid(), containerIds); + + for (ContainerID containerID : reportResult.getMissingContainers()) { + containerStateManager + .removeContainerReplica(containerID, datanodeOrigin); + emitReplicationRequestEvent(containerID, publisher); + } + + for (ContainerID containerID : reportResult.getNewContainers()) { + containerStateManager.addContainerReplica(containerID, datanodeOrigin); + + emitReplicationRequestEvent(containerID, publisher); + } + + } catch (IOException e) { + //TODO: stop all the replication? + LOG.error("Error on processing container report from datanode {}", + datanodeOrigin, e); + } + + } + + private void emitReplicationRequestEvent(ContainerID containerID, + EventPublisher publisher) throws SCMException { + ContainerInfo container = containerStateManager.getContainer(containerID); + + if (container == null) { + //warning unknown container + LOG.warn( + "Container is missing from containerStateManager. Can't request " + + "replication. {}", + containerID); + } + if (replicationStatus.isReplicationEnabled()) { + + int existingReplicas = + containerStateManager.getContainerReplicas(containerID).size(); + + int expectedReplicas = container.getReplicationFactor().getNumber(); + + if (existingReplicas != expectedReplicas) { + + publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER, + new ReplicationRequest(containerID.getId(), existingReplicas, + container.getReplicationFactor().getNumber())); + } + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java new file mode 100644 index 0000000000..4a9888cf3e --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatus.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import javax.management.ObjectName; +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.metrics2.util.MBeans; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Event listener to track the current state of replication. + */ +public class ReplicationActivityStatus + implements EventHandler, ReplicationActivityStatusMXBean, + Closeable { + + private static final Logger LOG = + LoggerFactory.getLogger(ReplicationActivityStatus.class); + + private AtomicBoolean replicationEnabled = new AtomicBoolean(); + + private ObjectName jmxObjectName; + + public boolean isReplicationEnabled() { + return replicationEnabled.get(); + } + + @VisibleForTesting + public void setReplicationEnabled(boolean enabled) { + replicationEnabled.set(enabled); + } + + @VisibleForTesting + public void enableReplication() { + replicationEnabled.set(true); + } + + /** + * The replication status could be set by async events. + */ + @Override + public void onMessage(Boolean enabled, EventPublisher publisher) { + replicationEnabled.set(enabled); + } + + public void start() { + try { + this.jmxObjectName = + MBeans.register( + "StorageContainerManager", "ReplicationActivityStatus", this); + } catch (Exception ex) { + LOG.error("JMX bean for ReplicationActivityStatus can't be registered", + ex); + } + } + + @Override + public void close() throws IOException { + if (this.jmxObjectName != null) { + MBeans.unregister(jmxObjectName); + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java new file mode 100644 index 0000000000..164bd247ef --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationActivityStatusMXBean.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +/** + * JMX interface to monitor replication status. + */ +public interface ReplicationActivityStatusMXBean { + + boolean isReplicationEnabled(); + + void setReplicationEnabled(boolean enabled); +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java index ef7c546641..d40cd9cd17 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java @@ -29,18 +29,24 @@ public class ReplicationRequest implements Comparable, Serializable { private final long containerId; - private final short replicationCount; - private final short expecReplicationCount; + private final int replicationCount; + private final int expecReplicationCount; private final long timestamp; - public ReplicationRequest(long containerId, short replicationCount, - long timestamp, short expecReplicationCount) { + public ReplicationRequest(long containerId, int replicationCount, + long timestamp, int expecReplicationCount) { this.containerId = containerId; this.replicationCount = replicationCount; this.timestamp = timestamp; this.expecReplicationCount = expecReplicationCount; } + public ReplicationRequest(long containerId, int replicationCount, + int expecReplicationCount) { + this(containerId, replicationCount, System.currentTimeMillis(), + expecReplicationCount); + } + /** * Compares this object with the specified object for order. Returns a * negative integer, zero, or a positive integer as this object is less @@ -93,7 +99,7 @@ public long getContainerId() { return containerId; } - public short getReplicationCount() { + public int getReplicationCount() { return replicationCount; } @@ -101,7 +107,17 @@ public long getTimestamp() { return timestamp; } - public short getExpecReplicationCount() { + public int getExpecReplicationCount() { return expecReplicationCount; } + + @Override + public String toString() { + return "ReplicationRequest{" + + "containerId=" + containerId + + ", replicationCount=" + replicationCount + + ", expecReplicationCount=" + expecReplicationCount + + ", timestamp=" + timestamp + + '}'; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index d49dd4fe5c..70b1e9635a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -173,6 +173,15 @@ public final class SCMEvents { public static final TypedEvent REPLICATION_COMPLETE = new TypedEvent<>(ReplicationCompleted.class); + /** + * Signal for all the components (but especially for the replication + * manager and container report handler) that the replication could be + * started. Should be send only if (almost) all the container state are + * available from the datanodes. + */ + public static final TypedEvent START_REPLICATION = + new TypedEvent<>(Boolean.class); + /** * Private Ctor. Never Constructed. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java index 1960604906..8ed6d59d02 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMap.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException; import java.util.Collections; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -68,7 +69,8 @@ public void insertNewDatanode(UUID datanodeID, Set containerIDs) throws SCMException { Preconditions.checkNotNull(containerIDs); Preconditions.checkNotNull(datanodeID); - if(dn2ContainerMap.putIfAbsent(datanodeID, containerIDs) != null) { + if (dn2ContainerMap.putIfAbsent(datanodeID, new HashSet<>(containerIDs)) + != null) { throw new SCMException("Node already exists in the map", DUPLICATE_DATANODE); } @@ -82,11 +84,13 @@ public void insertNewDatanode(UUID datanodeID, Set containerIDs) * @throws SCMException - if we don't know about this datanode, for new DN * use insertNewDatanode. */ - public void updateDatanodeMap(UUID datanodeID, Set containers) + public void setContainersForDatanode(UUID datanodeID, Set containers) throws SCMException { Preconditions.checkNotNull(datanodeID); Preconditions.checkNotNull(containers); - if(dn2ContainerMap.computeIfPresent(datanodeID, (k, v) -> v) == null){ + if (dn2ContainerMap + .computeIfPresent(datanodeID, (k, v) -> new HashSet<>(containers)) + == null) { throw new SCMException("No such datanode", NO_SUCH_DATANODE); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java index cb06cb3eea..26976295d4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/ReportResult.java @@ -21,10 +21,13 @@ import org.apache.hadoop.hdds.scm.container.ContainerID; +import java.util.Collections; import java.util.Set; +import com.google.common.base.Preconditions; + /** - * A Container Report gets processsed by the Node2Container and returns the + * A Container Report gets processsed by the Node2Container and returns * Report Result class. */ public class ReportResult { @@ -36,6 +39,8 @@ public class ReportResult { Set missingContainers, Set newContainers) { this.status = status; + Preconditions.checkNotNull(missingContainers); + Preconditions.checkNotNull(newContainers); this.missingContainers = missingContainers; this.newContainers = newContainers; } @@ -80,7 +85,16 @@ public ReportResultBuilder setNewContainers( } ReportResult build() { - return new ReportResult(status, missingContainers, newContainers); + + Set nullSafeMissingContainers = this.missingContainers; + Set nullSafeNewContainers = this.newContainers; + if (nullSafeNewContainers == null) { + nullSafeNewContainers = Collections.emptySet(); + } + if (nullSafeMissingContainers == null) { + nullSafeMissingContainers = Collections.emptySet(); + } + return new ReportResult(status, nullSafeMissingContainers, nullSafeNewContainers); } } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 9cb13181d3..47a9100b2b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -40,6 +40,8 @@ import org.apache.hadoop.hdds.scm.container.ContainerMapping; import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.replication + .ReplicationActivityStatus; import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.placement.algorithms @@ -164,9 +166,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl * Key = DatanodeUuid, value = ContainerStat. */ private Cache containerReportCache; + private final ReplicationManager replicationManager; + private final LeaseManager commandWatcherLeaseManager; + private final ReplicationActivityStatus replicationStatus; + /** * Creates a new StorageContainerManager. Configuration will be updated * with information on the @@ -199,19 +205,26 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { Node2ContainerMap node2ContainerMap = new Node2ContainerMap(); + replicationStatus = new ReplicationActivityStatus(); + CloseContainerEventHandler closeContainerHandler = new CloseContainerEventHandler(scmContainerManager); NodeReportHandler nodeReportHandler = new NodeReportHandler(scmNodeManager); - ContainerReportHandler containerReportHandler = - new ContainerReportHandler(scmContainerManager, node2ContainerMap); + CommandStatusReportHandler cmdStatusReportHandler = new CommandStatusReportHandler(); + NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap); StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap); DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap); ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); + ContainerReportHandler containerReportHandler = + new ContainerReportHandler(scmContainerManager, node2ContainerMap, + replicationStatus); + + eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler); eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler); @@ -221,6 +234,7 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException { eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler); eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler); eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler); + eventQueue.addHandler(SCMEvents.START_REPLICATION, replicationStatus); long watcherTimeout = conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, @@ -580,6 +594,7 @@ public void start() throws IOException { "server", getDatanodeProtocolServer().getDatanodeRpcAddress())); getDatanodeProtocolServer().start(); + replicationStatus.start(); httpServer.start(); scmBlockManager.start(); replicationManager.start(); @@ -591,6 +606,14 @@ public void start() throws IOException { */ public void stop() { + try { + LOG.info("Stopping Replication Activity Status tracker."); + replicationStatus.close(); + } catch (Exception ex) { + LOG.error("Replication Activity Status tracker stop failed.", ex); + } + + try { LOG.info("Stopping Replication Manager Service."); replicationManager.stop(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java new file mode 100644 index 0000000000..363db99687 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo + .Builder; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.replication + .ReplicationActivityStatus; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; +import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap; +import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .ContainerReportFromDatanode; +import org.apache.hadoop.hdds.server.events.Event; +import org.apache.hadoop.hdds.server.events.EventPublisher; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Matchers.anyLong; +import org.mockito.Mockito; +import static org.mockito.Mockito.when; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test the behaviour of the ContainerReportHandler. + */ +public class TestContainerReportHandler implements EventPublisher { + + private List publishedEvents = new ArrayList<>(); + + private static final Logger LOG = + LoggerFactory.getLogger(TestContainerReportHandler.class); + + @Before + public void resetEventCollector() { + publishedEvents.clear(); + } + + @Test + public void test() throws IOException { + + //given + + OzoneConfiguration conf = new OzoneConfiguration(); + Node2ContainerMap node2ContainerMap = new Node2ContainerMap(); + Mapping mapping = Mockito.mock(Mapping.class); + + when(mapping.getContainer(anyLong())) + .thenAnswer( + (Answer) invocation -> + new Builder() + .setReplicationFactor(ReplicationFactor.THREE) + .setContainerID((Long) invocation.getArguments()[0]) + .build() + ); + + ContainerStateManager containerStateManager = + new ContainerStateManager(conf, mapping); + + when(mapping.getStateManager()).thenReturn(containerStateManager); + + ReplicationActivityStatus replicationActivityStatus = + new ReplicationActivityStatus(); + + ContainerReportHandler reportHandler = + new ContainerReportHandler(mapping, node2ContainerMap, + replicationActivityStatus); + + DatanodeDetails dn1 = TestUtils.randomDatanodeDetails(); + DatanodeDetails dn2 = TestUtils.randomDatanodeDetails(); + DatanodeDetails dn3 = TestUtils.randomDatanodeDetails(); + DatanodeDetails dn4 = TestUtils.randomDatanodeDetails(); + node2ContainerMap.insertNewDatanode(dn1.getUuid(), new HashSet<>()); + node2ContainerMap.insertNewDatanode(dn2.getUuid(), new HashSet<>()); + node2ContainerMap.insertNewDatanode(dn3.getUuid(), new HashSet<>()); + node2ContainerMap.insertNewDatanode(dn4.getUuid(), new HashSet<>()); + PipelineSelector pipelineSelector = Mockito.mock(PipelineSelector.class); + + Pipeline pipeline = new Pipeline("leader", LifeCycleState.CLOSED, + ReplicationType.STAND_ALONE, ReplicationFactor.THREE, "pipeline1"); + + when(pipelineSelector.getReplicationPipeline(ReplicationType.STAND_ALONE, + ReplicationFactor.THREE)).thenReturn(pipeline); + + long c1 = containerStateManager + .allocateContainer(pipelineSelector, ReplicationType.STAND_ALONE, + ReplicationFactor.THREE, "root").getContainerInfo() + .getContainerID(); + + long c2 = containerStateManager + .allocateContainer(pipelineSelector, ReplicationType.STAND_ALONE, + ReplicationFactor.THREE, "root").getContainerInfo() + .getContainerID(); + + //when + + //initial reports before replication is enabled. 2 containers w 3 replicas. + reportHandler.onMessage( + new ContainerReportFromDatanode(dn1, + createContainerReport(new long[] {c1, c2})), this); + + reportHandler.onMessage( + new ContainerReportFromDatanode(dn2, + createContainerReport(new long[] {c1, c2})), this); + + reportHandler.onMessage( + new ContainerReportFromDatanode(dn3, + createContainerReport(new long[] {c1, c2})), this); + + reportHandler.onMessage( + new ContainerReportFromDatanode(dn4, + createContainerReport(new long[] {})), this); + + Assert.assertEquals(0, publishedEvents.size()); + + replicationActivityStatus.enableReplication(); + + //no problem here + reportHandler.onMessage( + new ContainerReportFromDatanode(dn1, + createContainerReport(new long[] {c1, c2})), this); + + Assert.assertEquals(0, publishedEvents.size()); + + //container is missing from d2 + reportHandler.onMessage( + new ContainerReportFromDatanode(dn2, + createContainerReport(new long[] {c1})), this); + + Assert.assertEquals(1, publishedEvents.size()); + ReplicationRequest replicationRequest = + (ReplicationRequest) publishedEvents.get(0); + + Assert.assertEquals(c2, replicationRequest.getContainerId()); + Assert.assertEquals(3, replicationRequest.getExpecReplicationCount()); + Assert.assertEquals(2, replicationRequest.getReplicationCount()); + + //container was replicated to dn4 + reportHandler.onMessage( + new ContainerReportFromDatanode(dn4, + createContainerReport(new long[] {c2})), this); + + //no more event, everything is perfect + Assert.assertEquals(1, publishedEvents.size()); + + //c2 was found at dn2 (it was missing before, magic) + reportHandler.onMessage( + new ContainerReportFromDatanode(dn2, + createContainerReport(new long[] {c1, c2})), this); + + //c2 is over replicated (dn1,dn2,dn3,dn4) + Assert.assertEquals(2, publishedEvents.size()); + + replicationRequest = + (ReplicationRequest) publishedEvents.get(1); + + Assert.assertEquals(c2, replicationRequest.getContainerId()); + Assert.assertEquals(3, replicationRequest.getExpecReplicationCount()); + Assert.assertEquals(4, replicationRequest.getReplicationCount()); + + } + + private ContainerReportsProto createContainerReport(long[] containerIds) { + + ContainerReportsProto.Builder crBuilder = + ContainerReportsProto.newBuilder(); + + for (long containerId : containerIds) { + org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder + ciBuilder = org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder(); + ciBuilder.setFinalhash("e16cc9d6024365750ed8dbd194ea46d2") + .setSize(5368709120L) + .setUsed(2000000000L) + .setKeyCount(100000000L) + .setReadCount(100000000L) + .setWriteCount(100000000L) + .setReadBytes(2000000000L) + .setWriteBytes(2000000000L) + .setContainerID(containerId) + .setDeleteTransactionId(0); + + crBuilder.addReports(ciBuilder.build()); + } + + return crBuilder.build(); + } + + @Override + public > void fireEvent( + EVENT_TYPE event, PAYLOAD payload) { + LOG.info("Event is published: {}", payload); + publishedEvents.add(payload); + } +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMapTest.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java similarity index 91% rename from hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMapTest.java rename to hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java index 79f1b40db0..633653b839 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/Node2ContainerMapTest.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/states/TestNode2ContainerMap.java @@ -38,7 +38,7 @@ /** * Test classes for Node2ContainerMap. */ -public class Node2ContainerMapTest { +public class TestNode2ContainerMap { private final static int DATANODE_COUNT = 300; private final static int CONTAINER_COUNT = 1000; private final Map> testData = new @@ -119,6 +119,26 @@ public void testProcessReportCheckOneNode() throws SCMException { Node2ContainerMap.ReportStatus.ALL_IS_WELL); } + @Test + public void testUpdateDatanodeMap() throws SCMException { + UUID datanodeId = getFirstKey(); + Set values = testData.get(datanodeId); + Node2ContainerMap map = new Node2ContainerMap(); + map.insertNewDatanode(datanodeId, values); + Assert.assertTrue(map.isKnownDatanode(datanodeId)); + Assert.assertEquals(CONTAINER_COUNT, map.getContainers(datanodeId).size()); + + //remove one container + values.remove(values.iterator().next()); + Assert.assertEquals(CONTAINER_COUNT - 1, values.size()); + Assert.assertEquals(CONTAINER_COUNT, map.getContainers(datanodeId).size()); + + map.setContainersForDatanode(datanodeId, values); + + Assert.assertEquals(values.size(), map.getContainers(datanodeId).size()); + Assert.assertEquals(values, map.getContainers(datanodeId)); + } + @Test public void testProcessReportInsertAll() throws SCMException { Node2ContainerMap map = new Node2ContainerMap(); @@ -183,7 +203,7 @@ public void testProcessReportDetectNewContainers() throws SCMException { final int newCount = 100; // This is not a mistake, the treeset seems to be reverse sorted. - ContainerID last = values.pollFirst(); + ContainerID last = values.first(); TreeSet addedContainers = new TreeSet<>(); for (int x = 1; x <= newCount; x++) { long cTemp = last.getId() + x; @@ -224,7 +244,7 @@ public void testProcessReportDetectMissingContainers() throws SCMException { final int removeCount = 100; Random r = new Random(); - ContainerID first = values.pollLast(); + ContainerID first = values.last(); TreeSet removedContainers = new TreeSet<>(); // Pick a random container to remove it is ok to collide no issues. @@ -270,7 +290,7 @@ public void testProcessReportDetectNewAndMissingContainers() throws final int removeCount = 100; Random r = new Random(); - ContainerID first = values.pollLast(); + ContainerID first = values.last(); TreeSet removedContainers = new TreeSet<>(); // Pick a random container to remove it is ok to collide no issues.