diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java index 05d4e77117..7b5c467e67 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java @@ -108,13 +108,6 @@ public class ContainerInfo implements Comparator, this.replicationType = repType; } - public ContainerInfo(ContainerInfo info) { - this(info.getContainerID(), info.getState(), info.getPipelineID(), - info.getUsedBytes(), info.getNumberOfKeys(), - info.getStateEnterTime(), info.getOwner(), - info.getDeleteTransactionId(), info.getSequenceId(), - info.getReplicationFactor(), info.getReplicationType()); - } /** * Needed for serialization findbugs. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeHandler.java index 95e0d93a63..fff1fb2463 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/chillmode/ChillModeHandler.java @@ -20,8 +20,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.scm.block.BlockManager; -import org.apache.hadoop.hdds.scm.container.replication. - ReplicationActivityStatus; +import org.apache.hadoop.hdds.scm.container.ReplicationManager; import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer; import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager.ChillModeStatus; import org.apache.hadoop.hdds.server.events.EventHandler; @@ -41,7 +40,7 @@ public class ChillModeHandler implements EventHandler { private final BlockManager scmBlockManager; private final long waitTime; private final AtomicBoolean isInChillMode = new AtomicBoolean(true); - private final ReplicationActivityStatus replicationActivityStatus; + private final ReplicationManager replicationManager; /** @@ -49,27 +48,27 @@ public class ChillModeHandler implements EventHandler { * @param configuration * @param clientProtocolServer * @param blockManager - * @param replicationStatus + * @param replicationManager */ public ChillModeHandler(Configuration configuration, SCMClientProtocolServer clientProtocolServer, BlockManager blockManager, - ReplicationActivityStatus replicationStatus) { + ReplicationManager replicationManager) { Objects.requireNonNull(configuration, "Configuration cannot be null"); Objects.requireNonNull(clientProtocolServer, "SCMClientProtocolServer " + "object cannot be null"); Objects.requireNonNull(blockManager, "BlockManager object cannot be null"); - Objects.requireNonNull(replicationStatus, "ReplicationActivityStatus " + + Objects.requireNonNull(replicationManager, "ReplicationManager " + "object cannot be null"); this.waitTime = configuration.getTimeDuration( HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT, HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT_DEFAULT, TimeUnit.MILLISECONDS); - scmClientProtocolServer = clientProtocolServer; - scmBlockManager = blockManager; - replicationActivityStatus = replicationStatus; + this.scmClientProtocolServer = clientProtocolServer; + this.scmBlockManager = blockManager; + this.replicationManager = replicationManager; - boolean chillModeEnabled = configuration.getBoolean( + final boolean chillModeEnabled = configuration.getBoolean( HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED, HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED_DEFAULT); isInChillMode.set(chillModeEnabled); @@ -89,13 +88,16 @@ public ChillModeHandler(Configuration configuration, @Override public void onMessage(ChillModeStatus chillModeStatus, EventPublisher publisher) { - isInChillMode.set(chillModeStatus.getChillModeStatus()); - - replicationActivityStatus.fireReplicationStart(isInChillMode.get(), - waitTime); - scmClientProtocolServer.setChillModeStatus(isInChillMode.get()); - scmBlockManager.setChillModeStatus(isInChillMode.get()); - + try { + isInChillMode.set(chillModeStatus.getChillModeStatus()); + scmClientProtocolServer.setChillModeStatus(isInChillMode.get()); + scmBlockManager.setChillModeStatus(isInChillMode.get()); + Thread.sleep(waitTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + replicationManager.start(); + } } public boolean getChillModeStatus() { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java new file mode 100644 index 0000000000..f660442b37 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.UUID; +import java.util.function.Supplier; + +/** + * Base class for all the container report handlers. + */ +public class AbstractContainerReportHandler { + + private final ContainerManager containerManager; + private final Logger logger; + + /** + * Constructs AbstractContainerReportHandler instance with the + * given ContainerManager instance. + * + * @param containerManager ContainerManager + * @param logger Logger to be used for logging + */ + AbstractContainerReportHandler(final ContainerManager containerManager, + final Logger logger) { + Preconditions.checkNotNull(containerManager); + Preconditions.checkNotNull(logger); + this.containerManager = containerManager; + this.logger = logger; + } + + /** + * Process the given ContainerReplica received from specified datanode. + * + * @param datanodeDetails DatanodeDetails of the node which reported + * this replica + * @param replicaProto ContainerReplica + * + * @throws IOException In case of any Exception while processing the report + */ + void processContainerReplica(final DatanodeDetails datanodeDetails, + final ContainerReplicaProto replicaProto) + throws IOException { + final ContainerID containerId = ContainerID + .valueof(replicaProto.getContainerID()); + final ContainerReplica replica = ContainerReplica.newBuilder() + .setContainerID(containerId) + .setContainerState(replicaProto.getState()) + .setDatanodeDetails(datanodeDetails) + .setOriginNodeId(UUID.fromString(replicaProto.getOriginNodeId())) + .setSequenceId(replicaProto.getBlockCommitSequenceId()) + .build(); + + logger.debug("Processing replica of container {} from datanode {}", + containerId, datanodeDetails); + // Synchronized block should be replaced by container lock, + // once we have introduced lock inside ContainerInfo. + synchronized (containerManager.getContainer(containerId)) { + updateContainerStats(containerId, replicaProto); + updateContainerState(datanodeDetails, containerId, replica); + containerManager.updateContainerReplica(containerId, replica); + } + } + + /** + * Update the container stats if it's lagging behind the stats in reported + * replica. + * + * @param containerId ID of the container + * @param replicaProto Container Replica information + * @throws ContainerNotFoundException If the container is not present + */ + private void updateContainerStats(final ContainerID containerId, + final ContainerReplicaProto replicaProto) + throws ContainerNotFoundException { + + if (!isUnhealthy(replicaProto::getState)) { + final ContainerInfo containerInfo = containerManager + .getContainer(containerId); + + if (containerInfo.getSequenceId() < + replicaProto.getBlockCommitSequenceId()) { + containerInfo.updateSequenceId( + replicaProto.getBlockCommitSequenceId()); + } + if (containerInfo.getUsedBytes() < replicaProto.getUsed()) { + containerInfo.setUsedBytes(replicaProto.getUsed()); + } + if (containerInfo.getNumberOfKeys() < replicaProto.getKeyCount()) { + containerInfo.setNumberOfKeys(replicaProto.getKeyCount()); + } + } + } + + /** + * Updates the container state based on the given replica state. + * + * @param datanode Datanode from which the report is received + * @param containerId ID of the container + * @param replica ContainerReplica + * @throws IOException In case of Exception + */ + private void updateContainerState(final DatanodeDetails datanode, + final ContainerID containerId, + final ContainerReplica replica) + throws IOException { + + final ContainerInfo container = containerManager + .getContainer(containerId); + + switch (container.getState()) { + case OPEN: + /* + * If the state of a container is OPEN, datanodes cannot report + * any other state. + */ + if (replica.getState() != State.OPEN) { + logger.warn("Container {} is in OPEN state, but the datanode {} " + + "reports an {} replica.", containerId, + datanode, replica.getState()); + // Should we take some action? + } + break; + case CLOSING: + /* + * When the container is in CLOSING state the replicas can be in any + * of the following states: + * + * - OPEN + * - CLOSING + * - QUASI_CLOSED + * - CLOSED + * + * If all the replica are either in OPEN or CLOSING state, do nothing. + * + * If the replica is in QUASI_CLOSED state, move the container to + * QUASI_CLOSED state. + * + * If the replica is in CLOSED state, mark the container as CLOSED. + * + */ + + if (replica.getState() == State.QUASI_CLOSED) { + logger.info("Moving container {} to QUASI_CLOSED state, datanode {} " + + "reported QUASI_CLOSED replica.", containerId, datanode); + containerManager.updateContainerState(containerId, + LifeCycleEvent.QUASI_CLOSE); + } + + if (replica.getState() == State.CLOSED) { + logger.info("Moving container {} to CLOSED state, datanode {} " + + "reported CLOSED replica.", containerId, datanode); + Preconditions.checkArgument(replica.getSequenceId() + == container.getSequenceId()); + containerManager.updateContainerState(containerId, + LifeCycleEvent.CLOSE); + } + + break; + case QUASI_CLOSED: + /* + * The container is in QUASI_CLOSED state, this means that at least + * one of the replica was QUASI_CLOSED. + * + * Now replicas can be in any of the following state. + * + * 1. OPEN + * 2. CLOSING + * 3. QUASI_CLOSED + * 4. CLOSED + * + * If at least one of the replica is in CLOSED state, mark the + * container as CLOSED. + * + */ + if (replica.getState() == State.CLOSED) { + logger.info("Moving container {} to CLOSED state, datanode {} " + + "reported CLOSED replica.", containerId, datanode); + Preconditions.checkArgument(replica.getSequenceId() + == container.getSequenceId()); + containerManager.updateContainerState(containerId, + LifeCycleEvent.FORCE_CLOSE); + } + break; + case CLOSED: + /* + * The container is already in closed state. do nothing. + */ + break; + case DELETING: + throw new UnsupportedOperationException( + "Unsupported container state 'DELETING'."); + case DELETED: + throw new UnsupportedOperationException( + "Unsupported container state 'DELETED'."); + default: + break; + } + } + + /** + * Returns true if the container replica is not marked as UNHEALTHY. + * + * @param replicaState State of the container replica. + * @return true if unhealthy, false otherwise + */ + private boolean isUnhealthy(final Supplier replicaState) { + return replicaState.get() == ContainerReplicaProto.State.UNHEALTHY; + } + +} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index 45007867cc..934b244d7b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -15,13 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdds.scm.container; -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; +package org.apache.hadoop.hdds.scm.container; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto @@ -29,115 +24,85 @@ import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList; -import org.apache.hadoop.hdds.scm.container.replication - .ReplicationActivityStatus; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; -import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; -import org.apache.hadoop.hdds.scm.server - .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .ContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; - -import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + /** * Handles container reports from datanode. */ -public class ContainerReportHandler implements - EventHandler { +public class ContainerReportHandler extends AbstractContainerReportHandler + implements EventHandler { private static final Logger LOG = LoggerFactory.getLogger(ContainerReportHandler.class); private final NodeManager nodeManager; - private final PipelineManager pipelineManager; private final ContainerManager containerManager; - private final ReplicationActivityStatus replicationStatus; + /** + * Constructs ContainerReportHandler instance with the + * given NodeManager and ContainerManager instance. + * + * @param nodeManager NodeManager instance + * @param containerManager ContainerManager instance + */ public ContainerReportHandler(final NodeManager nodeManager, - final PipelineManager pipelineManager, - final ContainerManager containerManager, - final ReplicationActivityStatus replicationActivityStatus) { - Preconditions.checkNotNull(nodeManager); - Preconditions.checkNotNull(pipelineManager); - Preconditions.checkNotNull(containerManager); - Preconditions.checkNotNull(replicationActivityStatus); + final ContainerManager containerManager) { + super(containerManager, LOG); this.nodeManager = nodeManager; - this.pipelineManager = pipelineManager; this.containerManager = containerManager; - this.replicationStatus = replicationActivityStatus; } + /** + * Process the container reports from datanodes. + * + * @param reportFromDatanode Container Report + * @param publisher EventPublisher reference + */ @Override public void onMessage(final ContainerReportFromDatanode reportFromDatanode, - final EventPublisher publisher) { + final EventPublisher publisher) { final DatanodeDetails datanodeDetails = reportFromDatanode.getDatanodeDetails(); - final ContainerReportsProto containerReport = reportFromDatanode.getReport(); try { + final List replicas = + containerReport.getReportsList(); + final Set containersInSCM = + nodeManager.getContainers(datanodeDetails); - final List replicas = containerReport - .getReportsList(); - - // ContainerIDs which SCM expects this datanode to have. - final Set expectedContainerIDs = nodeManager - .getContainers(datanodeDetails); - - // ContainerIDs that this datanode actually has. - final Set actualContainerIDs = replicas.parallelStream() + final Set containersInDn = replicas.parallelStream() .map(ContainerReplicaProto::getContainerID) .map(ContainerID::valueof).collect(Collectors.toSet()); - // Container replicas which SCM is not aware of. - final Set newReplicas = - new HashSet<>(actualContainerIDs); - newReplicas.removeAll(expectedContainerIDs); + final Set missingReplicas = new HashSet<>(containersInSCM); + missingReplicas.removeAll(containersInDn); - // Container replicas which are missing from datanode. - final Set missingReplicas = - new HashSet<>(expectedContainerIDs); - missingReplicas.removeAll(actualContainerIDs); + processContainerReplicas(datanodeDetails, replicas); + processMissingReplicas(datanodeDetails, missingReplicas); + updateDeleteTransaction(datanodeDetails, replicas, publisher); - processContainerReplicas(datanodeDetails, replicas, publisher); - - // Remove missing replica from ContainerManager - for (ContainerID id : missingReplicas) { - try { - containerManager.getContainerReplicas(id) - .stream() - .filter(replica -> - replica.getDatanodeDetails().equals(datanodeDetails)) - .findFirst() - .ifPresent(replica -> { - try { - containerManager.removeContainerReplica(id, replica); - } catch (ContainerNotFoundException | - ContainerReplicaNotFoundException e) { - // This should not happen, but even if it happens, not an - // issue - } - }); - } catch (ContainerNotFoundException e) { - LOG.warn("Cannot remove container replica, container {} not found {}", - id, e); - } - } - - // Update the latest set of containers for this datanode in NodeManager. - nodeManager.setContainers(datanodeDetails, actualContainerIDs); - - // Replicate if needed. - newReplicas.forEach(id -> checkReplicationState(id, publisher)); - missingReplicas.forEach(id -> checkReplicationState(id, publisher)); + /* + * Update the latest set of containers for this datanode in + * NodeManager + */ + nodeManager.setContainers(datanodeDetails, containersInDn); } catch (NodeNotFoundException ex) { LOG.error("Received container report from unknown datanode {} {}", @@ -146,37 +111,84 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode, } + /** + * Processes the ContainerReport. + * + * @param datanodeDetails Datanode from which this report was received + * @param replicas list of ContainerReplicaProto + */ private void processContainerReplicas(final DatanodeDetails datanodeDetails, + final List replicas) { + for (ContainerReplicaProto replicaProto : replicas) { + try { + processContainerReplica(datanodeDetails, replicaProto); + } catch (ContainerNotFoundException e) { + LOG.error("Received container report for an unknown container" + + " {} from datanode {}.", replicaProto.getContainerID(), + datanodeDetails, e); + } catch (IOException e) { + LOG.error("Exception while processing container report for container" + + " {} from datanode {}.", replicaProto.getContainerID(), + datanodeDetails, e); + } + } + } + + /** + * Process the missing replica on the given datanode. + * + * @param datanodeDetails DatanodeDetails + * @param missingReplicas ContainerID which are missing on the given datanode + */ + private void processMissingReplicas(final DatanodeDetails datanodeDetails, + final Set missingReplicas) { + for (ContainerID id : missingReplicas) { + try { + containerManager.getContainerReplicas(id).stream() + .filter(replica -> replica.getDatanodeDetails() + .equals(datanodeDetails)).findFirst() + .ifPresent(replica -> { + try { + containerManager.removeContainerReplica(id, replica); + } catch (ContainerNotFoundException | + ContainerReplicaNotFoundException ignored) { + // This should not happen, but even if it happens, not an issue + } + }); + } catch (ContainerNotFoundException e) { + LOG.warn("Cannot remove container replica, container {} not found.", + id, e); + } + } + } + + /** + * Updates the Delete Transaction Id for the given datanode. + * + * @param datanodeDetails DatanodeDetails + * @param replicas List of ContainerReplicaProto + * @param publisher EventPublisher reference + */ + private void updateDeleteTransaction(final DatanodeDetails datanodeDetails, final List replicas, final EventPublisher publisher) { final PendingDeleteStatusList pendingDeleteStatusList = new PendingDeleteStatusList(datanodeDetails); - for (ContainerReplicaProto replicaProto : replicas) { + for (ContainerReplicaProto replica : replicas) { try { - final ContainerID containerID = ContainerID.valueof( - replicaProto.getContainerID()); - - ReportHandlerHelper.processContainerReplica(containerManager, - containerID, replicaProto, datanodeDetails, publisher, LOG); - - final ContainerInfo containerInfo = containerManager - .getContainer(containerID); - + final ContainerInfo containerInfo = containerManager.getContainer( + ContainerID.valueof(replica.getContainerID())); if (containerInfo.getDeleteTransactionId() > - replicaProto.getDeleteTransactionId()) { - pendingDeleteStatusList - .addPendingDeleteStatus(replicaProto.getDeleteTransactionId(), - containerInfo.getDeleteTransactionId(), - containerInfo.getContainerID()); + replica.getDeleteTransactionId()) { + pendingDeleteStatusList.addPendingDeleteStatus( + replica.getDeleteTransactionId(), + containerInfo.getDeleteTransactionId(), + containerInfo.getContainerID()); } - } catch (ContainerNotFoundException e) { - LOG.error("Received container report for an unknown container {} from" - + " datanode {} {}", replicaProto.getContainerID(), - datanodeDetails, e); - } catch (IOException e) { - LOG.error("Exception while processing container report for container" - + " {} from datanode {} {}", replicaProto.getContainerID(), - datanodeDetails, e); + } catch (ContainerNotFoundException cnfe) { + LOG.warn("Cannot update pending delete transaction for " + + "container #{}. Reason: container missing.", + replica.getContainerID()); } } if (pendingDeleteStatusList.getNumPendingDeletes() > 0) { @@ -184,30 +196,4 @@ private void processContainerReplicas(final DatanodeDetails datanodeDetails, pendingDeleteStatusList); } } - - private void checkReplicationState(ContainerID containerID, - EventPublisher publisher) { - try { - ContainerInfo container = containerManager.getContainer(containerID); - replicateIfNeeded(container, publisher); - } catch (ContainerNotFoundException ex) { - LOG.warn("Container is missing from containerStateManager. Can't request " - + "replication. {} {}", containerID, ex); - } - - } - - private void replicateIfNeeded(ContainerInfo container, - EventPublisher publisher) throws ContainerNotFoundException { - if (!container.isOpen() && replicationStatus.isReplicationEnabled()) { - final int existingReplicas = containerManager - .getContainerReplicas(container.containerID()).size(); - final int expectedReplicas = container.getReplicationFactor().getNumber(); - if (existingReplicas != expectedReplicas) { - publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER, - new ReplicationRequest(container.getContainerID(), - existingReplicas, expectedReplicas)); - } - } - } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index 4af8678a0e..a37bf33bd9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -310,20 +310,19 @@ ContainerInfo allocateContainer( * * @param containerID - ContainerID * @param event - LifeCycle Event - * @return Updated ContainerInfo. * @throws SCMException on Failure. */ - ContainerInfo updateContainerState(final ContainerID containerID, + void updateContainerState(final ContainerID containerID, final HddsProtos.LifeCycleEvent event) throws SCMException, ContainerNotFoundException { final ContainerInfo info = containers.getContainerInfo(containerID); try { + final LifeCycleState oldState = info.getState(); final LifeCycleState newState = stateMachine.getNextState( info.getState(), event); containers.updateState(containerID, info.getState(), newState); containerStateCount.incrementAndGet(newState); - containerStateCount.decrementAndGet(info.getState()); - return containers.getContainerInfo(containerID); + containerStateCount.decrementAndGet(oldState); } catch (InvalidStateTransitionException ex) { String error = String.format("Failed to update container state %s, " + "reason: invalid state transition from state: %s upon " + @@ -334,18 +333,6 @@ ContainerInfo updateContainerState(final ContainerID containerID, } } - /** - * Update the container State. - * @param info - Container Info - * @return ContainerInfo - * @throws SCMException - on Error. - */ - ContainerInfo updateContainerInfo(final ContainerInfo info) - throws ContainerNotFoundException { - containers.updateContainerInfo(info); - return containers.getContainerInfo(info.containerID()); - } - /** * Update deleteTransactionId for a container. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java index d70edfbe3a..042fd56b6a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java @@ -18,55 +18,40 @@ package org.apache.hadoop.hdds.scm.container; -import com.google.common.base.Preconditions; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; -import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; -import org.apache.hadoop.hdds.scm.server - .SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode; +import java.io.IOException; + +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos + .ContainerReplicaProto; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .IncrementalContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - /** * Handles incremental container reports from datanode. */ -public class IncrementalContainerReportHandler implements - EventHandler { +public class IncrementalContainerReportHandler extends + AbstractContainerReportHandler + implements EventHandler { - private static final Logger LOG = - LoggerFactory.getLogger(IncrementalContainerReportHandler.class); - - private final PipelineManager pipelineManager; - private final ContainerManager containerManager; + private static final Logger LOG = LoggerFactory.getLogger( + IncrementalContainerReportHandler.class); public IncrementalContainerReportHandler( - final PipelineManager pipelineManager, final ContainerManager containerManager) { - Preconditions.checkNotNull(pipelineManager); - Preconditions.checkNotNull(containerManager); - this.pipelineManager = pipelineManager; - this.containerManager = containerManager; + super(containerManager, LOG); } @Override - public void onMessage( - final IncrementalContainerReportFromDatanode containerReportFromDatanode, - final EventPublisher publisher) { + public void onMessage(final IncrementalContainerReportFromDatanode report, + final EventPublisher publisher) { for (ContainerReplicaProto replicaProto : - containerReportFromDatanode.getReport().getReportList()) { + report.getReport().getReportList()) { try { - final DatanodeDetails datanodeDetails = containerReportFromDatanode - .getDatanodeDetails(); - final ContainerID containerID = ContainerID - .valueof(replicaProto.getContainerID()); - ReportHandlerHelper.processContainerReplica(containerManager, - containerID, replicaProto, datanodeDetails, publisher, LOG); + processContainerReplica(report.getDatanodeDetails(), replicaProto); } catch (ContainerNotFoundException e) { LOG.warn("Container {} not found!", replicaProto.getContainerID()); } catch (IOException e) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java index 97c600b411..1dce81b9c6 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java @@ -169,6 +169,15 @@ public synchronized void start() { } } + /** + * Returns true if the Replication Monitor Thread is running. + * + * @return true if running, false otherwise + */ + public boolean isRunning() { + return replicationMonitor.isAlive(); + } + /** * Process all the containers immediately. */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java deleted file mode 100644 index c566ca9367..0000000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java +++ /dev/null @@ -1,365 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm.container; - -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; -import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; -import org.slf4j.Logger; - -import java.io.IOException; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; - -import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND; - -/** - * Helper functions to handler container reports. - */ -public final class ReportHandlerHelper { - - private ReportHandlerHelper() {} - - /** - * Processes the container replica and updates the container state in SCM. - * If needed, sends command to datanode to update the replica state. - * - * @param containerManager ContainerManager instance - * @param containerId Id of the container - * @param replicaProto replica of the container - * @param datanodeDetails datanode where the replica resides - * @param publisher event publisher - * @param logger for logging - * @throws IOException - */ - static void processContainerReplica(final ContainerManager containerManager, - final ContainerID containerId, final ContainerReplicaProto replicaProto, - final DatanodeDetails datanodeDetails, final EventPublisher publisher, - final Logger logger) throws IOException { - - final ContainerReplica replica = ContainerReplica.newBuilder() - .setContainerID(containerId) - .setContainerState(replicaProto.getState()) - .setDatanodeDetails(datanodeDetails) - .setOriginNodeId(UUID.fromString(replicaProto.getOriginNodeId())) - .setSequenceId(replicaProto.getBlockCommitSequenceId()) - .build(); - - // This is an in-memory update. - containerManager.updateContainerReplica(containerId, replica); - ReportHandlerHelper.reconcileContainerState(containerManager, - containerId, publisher, logger); - - final ContainerInfo containerInfo = containerManager - .getContainer(containerId); - if (containerInfo.getUsedBytes() < replicaProto.getUsed()) { - containerInfo.setUsedBytes(replicaProto.getUsed()); - } - - if (containerInfo.getNumberOfKeys() < replicaProto.getKeyCount()) { - containerInfo.setNumberOfKeys(replicaProto.getKeyCount()); - } - - // Now we have reconciled the container state. If the container state and - // the replica state doesn't match, then take appropriate action. - ReportHandlerHelper.sendReplicaCommands( - datanodeDetails, containerInfo, replica, publisher, logger); - } - - - /** - * Reconcile the container state based on the ContainerReplica states. - * ContainerState is updated after the reconciliation. - * - * @param manager ContainerManager - * @param containerId container id - * @throws ContainerNotFoundException - */ - private static void reconcileContainerState(final ContainerManager manager, - final ContainerID containerId, final EventPublisher publisher, - final Logger logger) throws IOException { - // TODO: handle unhealthy replica. - synchronized (manager.getContainer(containerId)) { - final ContainerInfo container = manager.getContainer(containerId); - final Set replicas = manager.getContainerReplicas( - containerId); - final LifeCycleState containerState = container.getState(); - switch (containerState) { - case OPEN: - /* - * If the container state is OPEN. - * None of the replica should be in any other state. - * - */ - List invalidReplicas = replicas.stream() - .filter(replica -> replica.getState() != State.OPEN) - .collect(Collectors.toList()); - if (!invalidReplicas.isEmpty()) { - logger.warn("Container {} has invalid replica state." + - "Invalid Replicas: {}", containerId, invalidReplicas); - } - // A container cannot be over replicated when in OPEN state. - break; - case CLOSING: - /* - * SCM has asked DataNodes to close the container. Now the replicas - * can be in any of the following states. - * - * 1. OPEN - * 2. CLOSING - * 3. QUASI_CLOSED - * 4. CLOSED - * - * If all the replica are either in OPEN or CLOSING state, do nothing. - * - * If any one of the replica is in QUASI_CLOSED state, move the - * container to QUASI_CLOSED state. - * - * If any one of the replica is in CLOSED state, mark the container as - * CLOSED. The close has happened via Ratis. - * - */ - Optional closedReplica = replicas.stream() - .filter(replica -> replica.getState() == State.CLOSED) - .findFirst(); - if (closedReplica.isPresent()) { - container.updateSequenceId(closedReplica.get().getSequenceId()); - manager.updateContainerState( - containerId, HddsProtos.LifeCycleEvent.CLOSE); - - // TODO: remove container from OPEN pipeline, since the container is - // closed we can go ahead and remove it from Ratis pipeline. - } else if (replicas.stream() - .anyMatch(replica -> replica.getState() == State.QUASI_CLOSED)) { - manager.updateContainerState( - containerId, HddsProtos.LifeCycleEvent.QUASI_CLOSE); - } - break; - case QUASI_CLOSED: - /* - * The container is in QUASI_CLOSED state, this means that at least - * one of the replica is in QUASI_CLOSED/CLOSED state. - * Other replicas can be in any of the following state. - * - * 1. OPEN - * 2. CLOSING - * 3. QUASI_CLOSED - * 4. CLOSED - * - * If <50% of container replicas are in QUASI_CLOSED state and all - * the other replica are either in OPEN or CLOSING state, do nothing. - * We cannot identify the correct replica since we don't have quorum - * yet. - * - * If >50% (quorum) of replicas are in QUASI_CLOSED state and other - * replicas are either in OPEN or CLOSING state, try to identify - * the latest container replica using originNodeId and sequenceId. - * Force close those replica(s) which have the latest sequenceId. - * - * If at least one of the replica is in CLOSED state, mark the - * container as CLOSED. Force close the replicas which matches the - * sequenceId of the CLOSED replica. - * - */ - if (replicas.stream() - .anyMatch(replica -> replica.getState() == State.CLOSED)) { - manager.updateContainerState( - containerId, HddsProtos.LifeCycleEvent.FORCE_CLOSE); - // TODO: remove container from OPEN pipeline, since the container is - // closed we can go ahead and remove it from Ratis pipeline. - } else { - final int replicationFactor = container - .getReplicationFactor().getNumber(); - final List quasiClosedReplicas = replicas.stream() - .filter(replica -> replica.getState() == State.QUASI_CLOSED) - .collect(Collectors.toList()); - final long uniqueQuasiClosedReplicaCount = quasiClosedReplicas - .stream() - .map(ContainerReplica::getOriginDatanodeId) - .distinct() - .count(); - - if (uniqueQuasiClosedReplicaCount > (replicationFactor / 2)) { - // Quorum of unique replica has been QUASI_CLOSED - long sequenceId = forceCloseContainerReplicaWithHighestSequenceId( - container, quasiClosedReplicas, publisher); - if (sequenceId != -1L) { - container.updateSequenceId(sequenceId); - } - } - } - break; - case CLOSED: - /* - * The container is already in closed state. do nothing. - */ - break; - case DELETING: - // Not handled. - throw new UnsupportedOperationException("Unsupported container state" + - " 'DELETING'."); - case DELETED: - // Not handled. - throw new UnsupportedOperationException("Unsupported container state" + - " 'DELETED'."); - default: - break; - } - } - } - - /** - * Compares the QUASI_CLOSED replicas of a container and sends close command. - * - * @param quasiClosedReplicas list of quasi closed replicas - * @return the sequenceId of the closed replica. - */ - private static long forceCloseContainerReplicaWithHighestSequenceId( - final ContainerInfo container, - final List quasiClosedReplicas, - final EventPublisher publisher) { - - final long highestSequenceId = quasiClosedReplicas.stream() - .map(ContainerReplica::getSequenceId) - .max(Long::compare) - .orElse(-1L); - - if (highestSequenceId != -1L) { - quasiClosedReplicas.stream() - .filter(replica -> replica.getSequenceId() == highestSequenceId) - .forEach(replica -> { - CloseContainerCommand closeContainerCommand = - new CloseContainerCommand(container.getContainerID(), - container.getPipelineID(), true); - publisher.fireEvent(DATANODE_COMMAND, - new CommandForDatanode<>( - replica.getDatanodeDetails().getUuid(), - closeContainerCommand)); - }); - } - return highestSequenceId; - } - - /** - * Based on the container and replica state, send command to datanode if - * required. - * - * @param datanodeDetails datanode where the replica resides - * @param containerInfo container information - * @param replica replica information - * @param publisher queue to publish the datanode command event - * @param log for logging - */ - static void sendReplicaCommands( - final DatanodeDetails datanodeDetails, - final ContainerInfo containerInfo, - final ContainerReplica replica, - final EventPublisher publisher, - final Logger log) { - final HddsProtos.LifeCycleState containerState = containerInfo.getState(); - final ContainerReplicaProto.State replicaState = replica.getState(); - - if(!ReportHandlerHelper.compareState(containerState, replicaState)) { - if (containerState == HddsProtos.LifeCycleState.OPEN) { - // When a container state in SCM is OPEN, there is no way a datanode - // can quasi close/close the container. - log.warn("Invalid container replica state for container {}" + - " from datanode {}. Expected state is OPEN.", - containerInfo.containerID(), datanodeDetails); - // The replica can go CORRUPT, we have to handle it. - } - if (containerState == HddsProtos.LifeCycleState.CLOSING || - containerState == HddsProtos.LifeCycleState.QUASI_CLOSED) { - // Resend close container event for this datanode if the container - // replica state is OPEN/CLOSING. - if (replicaState == ContainerReplicaProto.State.OPEN || - replicaState == ContainerReplicaProto.State.CLOSING) { - CloseContainerCommand closeContainerCommand = - new CloseContainerCommand(containerInfo.getContainerID(), - containerInfo.getPipelineID()); - publisher.fireEvent(DATANODE_COMMAND, - new CommandForDatanode<>( - replica.getDatanodeDetails().getUuid(), - closeContainerCommand)); - } - } - if (containerState == HddsProtos.LifeCycleState.CLOSED) { - if (replicaState == ContainerReplicaProto.State.OPEN || - replicaState == ContainerReplicaProto.State.CLOSING || - replicaState == ContainerReplicaProto.State.QUASI_CLOSED) { - // Send force close container event for this datanode if the container - // replica state is OPEN/CLOSING/QUASI_CLOSED. - - // Close command will be send only if this replica matches the - // sequence of the container. - if (containerInfo.getSequenceId() == - replica.getSequenceId()) { - CloseContainerCommand closeContainerCommand = - new CloseContainerCommand(containerInfo.getContainerID(), - containerInfo.getPipelineID(), true); - publisher.fireEvent(DATANODE_COMMAND, - new CommandForDatanode<>( - replica.getDatanodeDetails().getUuid(), - closeContainerCommand)); - } - // TODO: delete the replica if the BCSID doesn't match. - } - } - } - - } - - /** - * Compares the container and replica state. - * - * @param containerState container state - * @param replicaState replica state - * @return true if the states are same, else false - */ - private static boolean compareState(final LifeCycleState containerState, - final State replicaState) { - // TODO: handle unhealthy replica. - switch (containerState) { - case OPEN: - return replicaState == State.OPEN; - case CLOSING: - return replicaState == State.CLOSING; - case QUASI_CLOSED: - return replicaState == State.QUASI_CLOSED; - case CLOSED: - return replicaState == State.CLOSED; - case DELETING: - return false; - case DELETED: - return false; - default: - return false; - } - } - -} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java index fe52669a09..1fa839567d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java @@ -295,18 +295,20 @@ public HddsProtos.LifeCycleState updateContainerState( // Should we return the updated ContainerInfo instead of LifeCycleState? lock.lock(); try { - ContainerInfo container = containerStateManager.getContainer(containerID); - ContainerInfo updatedContainer = - updateContainerStateInternal(containerID, event); - if (updatedContainer.getState() != LifeCycleState.OPEN - && container.getState() == LifeCycleState.OPEN) { + final ContainerInfo container = containerStateManager + .getContainer(containerID); + final LifeCycleState oldState = container.getState(); + containerStateManager.updateContainerState(containerID, event); + final LifeCycleState newState = container.getState(); + + if (oldState == LifeCycleState.OPEN && newState != LifeCycleState.OPEN) { pipelineManager - .removeContainerFromPipeline(updatedContainer.getPipelineID(), + .removeContainerFromPipeline(container.getPipelineID(), containerID); } final byte[] dbKey = Longs.toByteArray(containerID.getId()); - containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray()); - return updatedContainer.getState(); + containerStore.put(dbKey, container.getProtobuf().toByteArray()); + return newState; } catch (ContainerNotFoundException cnfe) { throw new SCMException( "Failed to update container state" @@ -318,11 +320,6 @@ public HddsProtos.LifeCycleState updateContainerState( } } - private ContainerInfo updateContainerStateInternal(ContainerID containerID, - HddsProtos.LifeCycleEvent event) throws IOException { - return containerStateManager.updateContainerState(containerID, event); - } - /** * Update deleteTransactionId according to deleteTransactionMap. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java index 2aba72487c..7411055d2d 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java @@ -296,8 +296,7 @@ public void updateState(ContainerID containerID, LifeCycleState currentState, checkIfContainerExist(containerID); final ContainerInfo currentInfo = containerMap.get(containerID); try { - final ContainerInfo newInfo = new ContainerInfo(currentInfo); - newInfo.setState(newState); + currentInfo.setState(newState); // We are updating two places before this update is done, these can // fail independently, since the code needs to handle it. @@ -309,13 +308,12 @@ public void updateState(ContainerID containerID, LifeCycleState currentState, // roll back the earlier change we did. If the rollback fails, we can // be in an inconsistent state, - containerMap.put(containerID, newInfo); lifeCycleStateMap.update(currentState, newState, containerID); LOG.trace("Updated the container {} to new state. Old = {}, new = " + "{}", containerID, currentState, newState); // Just flush both old and new data sets from the result cache. - flushCache(currentInfo, newInfo); + flushCache(currentInfo); } catch (SCMException ex) { LOG.error("Unable to update the container state. {}", ex); // we need to revert the change in this attribute since we are not @@ -324,7 +322,7 @@ public void updateState(ContainerID containerID, LifeCycleState currentState, "old state. Old = {}, Attempted state = {}", currentState, newState); - containerMap.put(containerID, currentInfo); + currentInfo.setState(currentState); // if this line throws, the state map can be in an inconsistent // state, since we will have modified the attribute by the diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java index 9c9550330f..a6b0704955 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMConfigurator.java @@ -23,7 +23,7 @@ import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager; import org.apache.hadoop.hdds.scm.container.ContainerManager; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.ReplicationManager; import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index b51b5376d0..79565f0e36 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -55,8 +55,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity; import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.ReplicationManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; @@ -99,7 +98,6 @@ import org.apache.hadoop.util.JvmPauseMonitor; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.utils.HddsVersionInfo; -import org.apache.hadoop.utils.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -176,7 +174,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl private SCMMetadataStore scmMetadataStore; private final EventQueue eventQueue; - private final Scheduler commonScheduler; /* * HTTP endpoint for JMX access. */ @@ -199,7 +196,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl private final LeaseManager commandWatcherLeaseManager; - private final ReplicationActivityStatus replicationStatus; private SCMChillModeManager scmChillModeManager; private CertificateServer certificateServer; @@ -287,8 +283,6 @@ public StorageContainerManager(OzoneConfiguration conf, commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher", watcherTimeout); initalizeSystemManagers(conf, configurator); - commonScheduler = new Scheduler("SCMCommonScheduler", false, 1); - replicationStatus = new ReplicationActivityStatus(commonScheduler); CloseContainerEventHandler closeContainerHandler = new CloseContainerEventHandler(pipelineManager, containerManager); @@ -311,12 +305,10 @@ public StorageContainerManager(OzoneConfiguration conf, new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService()); ContainerReportHandler containerReportHandler = - new ContainerReportHandler(scmNodeManager, pipelineManager, - containerManager, replicationStatus); + new ContainerReportHandler(scmNodeManager, containerManager); IncrementalContainerReportHandler incrementalContainerReportHandler = - new IncrementalContainerReportHandler( - pipelineManager, containerManager); + new IncrementalContainerReportHandler(containerManager); PipelineActionHandler pipelineActionHandler = new PipelineActionHandler(pipelineManager, conf); @@ -343,7 +335,7 @@ public StorageContainerManager(OzoneConfiguration conf, httpServer = new StorageContainerManagerHttpServer(conf); chillModeHandler = new ChillModeHandler(configuration, - clientProtocolServer, scmBlockManager, replicationStatus); + clientProtocolServer, scmBlockManager, replicationManager); eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager); eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager); @@ -422,8 +414,8 @@ private void initalizeSystemManagers(OzoneConfiguration conf, if (configurator.getReplicationManager() != null) { replicationManager = configurator.getReplicationManager(); } else { - replicationManager = new ReplicationManager(containerPlacementPolicy, - containerManager, eventQueue, commandWatcherLeaseManager); + replicationManager = new ReplicationManager(conf, + containerManager, containerPlacementPolicy, eventQueue); } if(configurator.getScmChillModeManager() != null) { scmChillModeManager = configurator.getScmChillModeManager(); @@ -917,8 +909,6 @@ public void start() throws IOException { httpServer.start(); scmBlockManager.start(); - replicationStatus.start(); - replicationManager.start(); // Start jvm monitor jvmPauseMonitor = new JvmPauseMonitor(); @@ -933,14 +923,6 @@ public void start() throws IOException { */ public void stop() { - try { - LOG.info("Stopping Replication Activity Status tracker."); - replicationStatus.close(); - } catch (Exception ex) { - LOG.error("Replication Activity Status tracker stop failed.", ex); - } - - try { LOG.info("Stopping Replication Manager Service."); replicationManager.stop(); @@ -1017,13 +999,6 @@ public void stop() { LOG.error("SCM Event Queue stop failed", ex); } - try { - LOG.info("Stopping SCM Common Scheduler."); - commonScheduler.close(); - } catch (Exception ex) { - LOG.error("SCM Common Scheduler close failed {}", ex); - } - if (jvmPauseMonitor != null) { jvmPauseMonitor.stop(); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index 19c35fd462..d61924a91e 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -533,6 +533,7 @@ public static ContainerInfo getContainer( .setReplicationType(HddsProtos.ReplicationType.RATIS) .setReplicationFactor(HddsProtos.ReplicationFactor.THREE) .setState(state) + .setSequenceId(10000L) .setOwner("TEST") .build(); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestChillModeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestChillModeHandler.java index efd69fde71..7c9f98e123 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestChillModeHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/chillmode/TestChillModeHandler.java @@ -22,16 +22,20 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus; +import org.apache.hadoop.hdds.scm.container.ContainerManager; +import org.apache.hadoop.hdds.scm.container.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .ContainerPlacementPolicy; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer; import org.apache.hadoop.hdds.server.events.EventQueue; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.utils.Scheduler; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import java.util.HashSet; + /** * Tests ChillModeHandler behavior. */ @@ -40,7 +44,7 @@ public class TestChillModeHandler { private OzoneConfiguration configuration; private SCMClientProtocolServer scmClientProtocolServer; - private ReplicationActivityStatus replicationActivityStatus; + private ReplicationManager replicationManager; private BlockManager blockManager; private ChillModeHandler chillModeHandler; private EventQueue eventQueue; @@ -54,15 +58,19 @@ public void setup(boolean enabled) { "3s"); scmClientProtocolServer = Mockito.mock(SCMClientProtocolServer.class); - replicationActivityStatus = new ReplicationActivityStatus( - new Scheduler("SCMCommonScheduler", false, 1)); + eventQueue = new EventQueue(); + final ContainerManager containerManager = + Mockito.mock(ContainerManager.class); + Mockito.when(containerManager.getContainerIDs()) + .thenReturn(new HashSet<>()); + replicationManager = new ReplicationManager(configuration, + containerManager, Mockito.mock(ContainerPlacementPolicy.class), + eventQueue); blockManager = Mockito.mock(BlockManagerImpl.class); chillModeHandler = new ChillModeHandler(configuration, scmClientProtocolServer, - blockManager, replicationActivityStatus); + blockManager, replicationManager); - - eventQueue = new EventQueue(); eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler); chillModeStatus = new SCMChillModeManager.ChillModeStatus(false); @@ -82,7 +90,7 @@ public void testChillModeHandlerWithChillModeEnabled() throws Exception { Assert.assertFalse(scmClientProtocolServer.getChillModeStatus()); Assert.assertFalse(((BlockManagerImpl) blockManager).isScmInChillMode()); GenericTestUtils.waitFor(() -> - replicationActivityStatus.isReplicationEnabled(), 1000, 5000); + replicationManager.isRunning(), 1000, 5000); } @@ -99,6 +107,6 @@ public void testChillModeHandlerWithChillModeDisbaled() throws Exception{ Assert.assertFalse(scmClientProtocolServer.getChillModeStatus()); Assert.assertFalse(((BlockManagerImpl) blockManager).isScmInChillMode()); GenericTestUtils.waitFor(() -> - replicationActivityStatus.isReplicationEnabled(), 1000, 5000); + replicationManager.isRunning(), 1000, 5000); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index 0b7cae48ad..41585bc8f7 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -16,124 +16,146 @@ */ package org.apache.hadoop.hdds.scm.container; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; -import org.apache.hadoop.hdds.scm.container.replication - .ReplicationActivityStatus; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; -import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.server .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.utils.Scheduler; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import org.mockito.stubbing.Answer; import java.io.IOException; -import java.util.Collections; import java.util.Iterator; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.hadoop.hdds.scm.TestUtils - .getReplicas; -import static org.apache.hadoop.hdds.scm.TestUtils - .getContainer; -import static org.apache.hadoop.hdds.scm.container - .TestContainerReportHelper.addContainerToContainerManager; -import static org.apache.hadoop.hdds.scm.container - .TestContainerReportHelper.mockUpdateContainerReplica; -import static org.apache.hadoop.hdds.scm.container - .TestContainerReportHelper.mockUpdateContainerState; +import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas; +import static org.apache.hadoop.hdds.scm.TestUtils.getContainer; /** * Test the behaviour of the ContainerReportHandler. */ public class TestContainerReportHandler { - private static Scheduler scheduler; + private NodeManager nodeManager; + private ContainerManager containerManager; + private ContainerStateManager containerStateManager; + private EventPublisher publisher; + + @Before + public void setup() throws IOException { + final Configuration conf = new OzoneConfiguration(); + this.nodeManager = new MockNodeManager(true, 10); + this.containerManager = Mockito.mock(ContainerManager.class); + this.containerStateManager = new ContainerStateManager(conf); + this.publisher = Mockito.mock(EventPublisher.class); + + + Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class))) + .thenAnswer(invocation -> containerStateManager + .getContainer((ContainerID)invocation.getArguments()[0])); + + Mockito.when(containerManager.getContainerReplicas( + Mockito.any(ContainerID.class))) + .thenAnswer(invocation -> containerStateManager + .getContainerReplicas((ContainerID)invocation.getArguments()[0])); + + Mockito.doAnswer(invocation -> { + containerStateManager + .updateContainerState((ContainerID)invocation.getArguments()[0], + (HddsProtos.LifeCycleEvent)invocation.getArguments()[1]); + return null; + }).when(containerManager).updateContainerState( + Mockito.any(ContainerID.class), + Mockito.any(HddsProtos.LifeCycleEvent.class)); + + Mockito.doAnswer(invocation -> { + containerStateManager.updateContainerReplica( + (ContainerID) invocation.getArguments()[0], + (ContainerReplica) invocation.getArguments()[1]); + return null; + }).when(containerManager).updateContainerReplica( + Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class)); + + Mockito.doAnswer(invocation -> { + containerStateManager.removeContainerReplica( + (ContainerID) invocation.getArguments()[0], + (ContainerReplica) invocation.getArguments()[1]); + return null; + }).when(containerManager).removeContainerReplica( + Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class)); - @BeforeClass - public static void setup() { - scheduler = new Scheduler("SCMCommonScheduler", false, 1); } - @AfterClass - public static void tearDown() { - scheduler.close(); + @After + public void tearDown() throws IOException { + containerStateManager.close(); } @Test public void testUnderReplicatedContainer() - throws NodeNotFoundException, ContainerNotFoundException, - ContainerReplicaNotFoundException { + throws NodeNotFoundException, ContainerNotFoundException, SCMException { - final NodeManager nodeManager = new MockNodeManager(true, 10); - final ContainerManager containerManager = Mockito.mock( - ContainerManager.class); - final ReplicationActivityStatus replicationActivityStatus = - new ReplicationActivityStatus(scheduler); - replicationActivityStatus.enableReplication(); - - final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); final ContainerReportHandler reportHandler = new ContainerReportHandler( - nodeManager, pipelineManager, containerManager, - replicationActivityStatus); + nodeManager, containerManager); final Iterator nodeIterator = nodeManager.getNodes( NodeState.HEALTHY).iterator(); final DatanodeDetails datanodeOne = nodeIterator.next(); final DatanodeDetails datanodeTwo = nodeIterator.next(); final DatanodeDetails datanodeThree = nodeIterator.next(); + final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED); final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED); final Set containerIDSet = Stream.of( containerOne.containerID(), containerTwo.containerID()) .collect(Collectors.toSet()); - final Set containerOneReplicas = getReplicas( - containerOne.containerID(), ContainerReplicaProto.State.CLOSED, - datanodeOne, datanodeTwo, datanodeThree); - final Set containerTwoReplicas = getReplicas( - containerTwo.containerID(), ContainerReplicaProto.State.CLOSED, - datanodeOne, datanodeTwo, datanodeThree); nodeManager.setContainers(datanodeOne, containerIDSet); nodeManager.setContainers(datanodeTwo, containerIDSet); nodeManager.setContainers(datanodeThree, containerIDSet); - addContainerToContainerManager( - containerManager, containerOne, containerOneReplicas); - addContainerToContainerManager( - containerManager, containerTwo, containerTwoReplicas); + containerStateManager.loadContainer(containerOne); + containerStateManager.loadContainer(containerTwo); - Mockito.doAnswer((Answer) invocation -> { - Object[] args = invocation.getArguments(); - if (args[0].equals(containerOne.containerID())) { - ContainerReplica replica = (ContainerReplica) args[1]; - containerOneReplicas.remove(replica); - } - return null; - }).when(containerManager).removeContainerReplica( - Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class)); + getReplicas(containerOne.containerID(), + ContainerReplicaProto.State.CLOSED, + datanodeOne, datanodeTwo, datanodeThree) + .forEach(r -> { + try { + containerStateManager.updateContainerReplica( + containerOne.containerID(), r); + } catch (ContainerNotFoundException ignored) { + } + }); + + getReplicas(containerTwo.containerID(), + ContainerReplicaProto.State.CLOSED, + datanodeOne, datanodeTwo, datanodeThree) + .forEach(r -> { + try { + containerStateManager.updateContainerReplica( + containerTwo.containerID(), r); + } catch (ContainerNotFoundException ignored) { + + } + }); - Mockito.when( - containerManager.getContainerReplicas(containerOne.containerID())) - .thenReturn(containerOneReplicas); - Mockito.when( - containerManager.getContainerReplicas(containerTwo.containerID())) - .thenReturn(containerTwoReplicas); // SCM expects both containerOne and containerTwo to be in all the three // datanodes datanodeOne, datanodeTwo and datanodeThree @@ -145,70 +167,65 @@ public void testUnderReplicatedContainer() final ContainerReportsProto containerReport = getContainerReportsProto( containerTwo.containerID(), ContainerReplicaProto.State.CLOSED, datanodeOne.getUuidString()); - final EventPublisher publisher = Mockito.mock(EventPublisher.class); final ContainerReportFromDatanode containerReportFromDatanode = new ContainerReportFromDatanode(datanodeOne, containerReport); reportHandler.onMessage(containerReportFromDatanode, publisher); + Assert.assertEquals(2, containerManager.getContainerReplicas( + containerOne.containerID()).size()); - // Now we should get a replication request for containerOne - Mockito.verify(publisher, Mockito.times(1)) - .fireEvent(Mockito.any(), Mockito.any()); - - // TODO: verify whether are actually getting a replication request event - // for containerOne } @Test public void testOverReplicatedContainer() throws NodeNotFoundException, - ContainerNotFoundException { + SCMException, ContainerNotFoundException { - final NodeManager nodeManager = new MockNodeManager(true, 10); - final ContainerManager containerManager = Mockito.mock( - ContainerManager.class); - final ReplicationActivityStatus replicationActivityStatus = - new ReplicationActivityStatus(scheduler); - replicationActivityStatus.enableReplication(); - - final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); final ContainerReportHandler reportHandler = new ContainerReportHandler( - nodeManager, pipelineManager, containerManager, - replicationActivityStatus); + nodeManager, containerManager); + final Iterator nodeIterator = nodeManager.getNodes( NodeState.HEALTHY).iterator(); final DatanodeDetails datanodeOne = nodeIterator.next(); final DatanodeDetails datanodeTwo = nodeIterator.next(); final DatanodeDetails datanodeThree = nodeIterator.next(); final DatanodeDetails datanodeFour = nodeIterator.next(); + final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED); final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED); + final Set containerIDSet = Stream.of( containerOne.containerID(), containerTwo.containerID()) .collect(Collectors.toSet()); - final Set containerOneReplicas = getReplicas( - containerOne.containerID(), ContainerReplicaProto.State.CLOSED, - datanodeOne, datanodeTwo, datanodeThree); - final Set containerTwoReplicas = getReplicas( - containerTwo.containerID(), ContainerReplicaProto.State.CLOSED, - datanodeOne, datanodeTwo, datanodeThree); nodeManager.setContainers(datanodeOne, containerIDSet); nodeManager.setContainers(datanodeTwo, containerIDSet); nodeManager.setContainers(datanodeThree, containerIDSet); - addContainerToContainerManager( - containerManager, containerOne, containerOneReplicas); - addContainerToContainerManager( - containerManager, containerTwo, containerTwoReplicas); + containerStateManager.loadContainer(containerOne); + containerStateManager.loadContainer(containerTwo); - Mockito.doAnswer((Answer) invocation -> { - Object[] args = invocation.getArguments(); - if (args[0].equals(containerOne.containerID())) { - containerOneReplicas.add((ContainerReplica) args[1]); - } - return null; - }).when(containerManager).updateContainerReplica( - Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class)); + getReplicas(containerOne.containerID(), + ContainerReplicaProto.State.CLOSED, + datanodeOne, datanodeTwo, datanodeThree) + .forEach(r -> { + try { + containerStateManager.updateContainerReplica( + containerOne.containerID(), r); + } catch (ContainerNotFoundException ignored) { + } + }); + + getReplicas(containerTwo.containerID(), + ContainerReplicaProto.State.CLOSED, + datanodeOne, datanodeTwo, datanodeThree) + .forEach(r -> { + try { + containerStateManager.updateContainerReplica( + containerTwo.containerID(), r); + } catch (ContainerNotFoundException ignored) { + + } + }); // SCM expects both containerOne and containerTwo to be in all the three @@ -220,114 +237,15 @@ public void testOverReplicatedContainer() throws NodeNotFoundException, final ContainerReportsProto containerReport = getContainerReportsProto( containerOne.containerID(), ContainerReplicaProto.State.CLOSED, - datanodeOne.getUuidString()); - final EventPublisher publisher = Mockito.mock(EventPublisher.class); + datanodeFour.getUuidString()); final ContainerReportFromDatanode containerReportFromDatanode = new ContainerReportFromDatanode(datanodeFour, containerReport); reportHandler.onMessage(containerReportFromDatanode, publisher); - Mockito.verify(publisher, Mockito.times(1)) - .fireEvent(Mockito.any(), Mockito.any()); - // TODO: verify whether are actually getting a replication request event - // for containerOne + Assert.assertEquals(4, containerManager.getContainerReplicas( + containerOne.containerID()).size()); } - @Test - public void testOpenToClosing() - throws NodeNotFoundException, ContainerNotFoundException { - /* - * The container is in CLOSING state and all the replicas are either in - * OPEN or CLOSING state. - * - * The datanode reports that the replica is still in OPEN state. - * - * In this case SCM should trigger close container event to the datanode. - */ - - final NodeManager nodeManager = new MockNodeManager(true, 10); - final ContainerManager containerManager = Mockito.mock( - ContainerManager.class); - final ReplicationActivityStatus replicationActivityStatus = - new ReplicationActivityStatus(scheduler); - replicationActivityStatus.enableReplication(); - - final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); - final ContainerReportHandler reportHandler = new ContainerReportHandler( - nodeManager, pipelineManager, containerManager, - replicationActivityStatus); - final Iterator nodeIterator = nodeManager.getNodes( - NodeState.HEALTHY).iterator(); - final DatanodeDetails datanodeOne = nodeIterator.next(); - final DatanodeDetails datanodeTwo = nodeIterator.next(); - final DatanodeDetails datanodeThree = nodeIterator.next(); - final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING); - final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED); - final Set containerIDSet = Stream.of( - containerOne.containerID(), containerTwo.containerID()) - .collect(Collectors.toSet()); - final Set containerOneReplicas = getReplicas( - containerOne.containerID(), ContainerReplicaProto.State.OPEN, - datanodeOne); - - containerOneReplicas.addAll(getReplicas( - containerOne.containerID(), ContainerReplicaProto.State.CLOSING, - datanodeTwo, datanodeThree)); - - final Set containerTwoReplicas = getReplicas( - containerTwo.containerID(), ContainerReplicaProto.State.CLOSED, - datanodeOne, datanodeTwo, datanodeThree); - - nodeManager.setContainers(datanodeOne, containerIDSet); - nodeManager.setContainers(datanodeTwo, containerIDSet); - nodeManager.setContainers(datanodeThree, containerIDSet); - - addContainerToContainerManager( - containerManager, containerOne, containerOneReplicas); - addContainerToContainerManager( - containerManager, containerTwo, containerTwoReplicas); - mockUpdateContainerReplica( - containerManager, containerOne, containerOneReplicas); - - // Replica in datanodeOne of containerOne is in OPEN state. - final ContainerReportsProto containerReport = getContainerReportsProto( - containerOne.containerID(), ContainerReplicaProto.State.OPEN, - datanodeOne.getUuidString()); - final EventPublisher publisher = Mockito.mock(EventPublisher.class); - final ContainerReportFromDatanode containerReportFromDatanode = - new ContainerReportFromDatanode(datanodeOne, containerReport); - reportHandler.onMessage(containerReportFromDatanode, publisher); - - // Now we should get close container event for containerOne on datanodeOne - Mockito.verify(publisher, Mockito.times(1)) - .fireEvent(Mockito.any(), Mockito.any()); - - // TODO: verify whether are actually getting a close container - // datanode command for containerOne/datanodeOne - - /* - * The container is in CLOSING state and all the replicas are either in - * OPEN or CLOSING state. - * - * The datanode reports that the replica is in CLOSING state. - * - * In this case SCM should trigger close container event to the datanode. - */ - - // Replica in datanodeOne of containerOne is in OPEN state. - final ContainerReportsProto containerReportTwo = getContainerReportsProto( - containerOne.containerID(), ContainerReplicaProto.State.OPEN, - datanodeOne.getUuidString()); - final ContainerReportFromDatanode containerReportTwoFromDatanode = - new ContainerReportFromDatanode(datanodeOne, containerReportTwo); - reportHandler.onMessage(containerReportTwoFromDatanode, publisher); - - // Now we should get close container event for containerOne on datanodeOne - Mockito.verify(publisher, Mockito.times(2)) - .fireEvent(Mockito.any(), Mockito.any()); - - // TODO: verify whether are actually getting a close container - // datanode command for containerOne/datanodeOne - } @Test public void testClosingToClosed() throws NodeNotFoundException, IOException { @@ -339,27 +257,23 @@ public void testClosingToClosed() throws NodeNotFoundException, IOException { * * In this case SCM should mark the container as CLOSED. */ - final NodeManager nodeManager = new MockNodeManager(true, 10); - final ContainerManager containerManager = Mockito.mock( - ContainerManager.class); - final ReplicationActivityStatus replicationActivityStatus = - new ReplicationActivityStatus(scheduler); - replicationActivityStatus.enableReplication(); - final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); final ContainerReportHandler reportHandler = new ContainerReportHandler( - nodeManager, pipelineManager, containerManager, - replicationActivityStatus); + nodeManager, containerManager); + final Iterator nodeIterator = nodeManager.getNodes( NodeState.HEALTHY).iterator(); final DatanodeDetails datanodeOne = nodeIterator.next(); final DatanodeDetails datanodeTwo = nodeIterator.next(); final DatanodeDetails datanodeThree = nodeIterator.next(); + final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING); final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED); + final Set containerIDSet = Stream.of( containerOne.containerID(), containerTwo.containerID()) .collect(Collectors.toSet()); + final Set containerOneReplicas = getReplicas( containerOne.containerID(), ContainerReplicaProto.State.CLOSING, @@ -379,25 +293,36 @@ public void testClosingToClosed() throws NodeNotFoundException, IOException { nodeManager.setContainers(datanodeTwo, containerIDSet); nodeManager.setContainers(datanodeThree, containerIDSet); - addContainerToContainerManager( - containerManager, containerOne, containerOneReplicas); - addContainerToContainerManager( - containerManager, containerTwo, containerTwoReplicas); - mockUpdateContainerReplica( - containerManager, containerOne, containerOneReplicas); - mockUpdateContainerState(containerManager, containerOne, - LifeCycleEvent.CLOSE, LifeCycleState.CLOSED); + containerStateManager.loadContainer(containerOne); + containerStateManager.loadContainer(containerTwo); + + containerOneReplicas.forEach(r -> { + try { + containerStateManager.updateContainerReplica( + containerTwo.containerID(), r); + } catch (ContainerNotFoundException ignored) { + + } + }); + + containerTwoReplicas.forEach(r -> { + try { + containerStateManager.updateContainerReplica( + containerTwo.containerID(), r); + } catch (ContainerNotFoundException ignored) { + + } + }); + final ContainerReportsProto containerReport = getContainerReportsProto( containerOne.containerID(), ContainerReplicaProto.State.CLOSED, datanodeOne.getUuidString()); - final EventPublisher publisher = Mockito.mock(EventPublisher.class); final ContainerReportFromDatanode containerReportFromDatanode = new ContainerReportFromDatanode(datanodeOne, containerReport); reportHandler.onMessage(containerReportFromDatanode, publisher); - Assert.assertEquals( - LifeCycleState.CLOSED, containerOne.getState()); + Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState()); } @Test @@ -411,29 +336,23 @@ public void testClosingToQuasiClosed() * * In this case SCM should move the container to QUASI_CLOSED. */ - final NodeManager nodeManager = new MockNodeManager(true, 10); - final ContainerManager containerManager = Mockito.mock( - ContainerManager.class); - final ReplicationActivityStatus replicationActivityStatus = - new ReplicationActivityStatus(scheduler); - replicationActivityStatus.enableReplication(); - final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); final ContainerReportHandler reportHandler = new ContainerReportHandler( - nodeManager, pipelineManager, containerManager, - replicationActivityStatus); + nodeManager, containerManager); + final Iterator nodeIterator = nodeManager.getNodes( NodeState.HEALTHY).iterator(); final DatanodeDetails datanodeOne = nodeIterator.next(); final DatanodeDetails datanodeTwo = nodeIterator.next(); final DatanodeDetails datanodeThree = nodeIterator.next(); - final ContainerInfo containerOne = - getContainer(LifeCycleState.CLOSING); - final ContainerInfo containerTwo = - getContainer(LifeCycleState.CLOSED); + + final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING); + final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED); + final Set containerIDSet = Stream.of( containerOne.containerID(), containerTwo.containerID()) .collect(Collectors.toSet()); + final Set containerOneReplicas = getReplicas( containerOne.containerID(), ContainerReplicaProto.State.CLOSING, @@ -451,60 +370,65 @@ public void testClosingToQuasiClosed() nodeManager.setContainers(datanodeTwo, containerIDSet); nodeManager.setContainers(datanodeThree, containerIDSet); - addContainerToContainerManager( - containerManager, containerOne, containerOneReplicas); - addContainerToContainerManager( - containerManager, containerTwo, containerTwoReplicas); - mockUpdateContainerReplica( - containerManager, containerOne, containerOneReplicas); - mockUpdateContainerState(containerManager, containerOne, - LifeCycleEvent.QUASI_CLOSE, LifeCycleState.QUASI_CLOSED); + containerStateManager.loadContainer(containerOne); + containerStateManager.loadContainer(containerTwo); + + containerOneReplicas.forEach(r -> { + try { + containerStateManager.updateContainerReplica( + containerTwo.containerID(), r); + } catch (ContainerNotFoundException ignored) { + + } + }); + + containerTwoReplicas.forEach(r -> { + try { + containerStateManager.updateContainerReplica( + containerTwo.containerID(), r); + } catch (ContainerNotFoundException ignored) { + + } + }); + final ContainerReportsProto containerReport = getContainerReportsProto( containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED, datanodeOne.getUuidString()); - final EventPublisher publisher = Mockito.mock(EventPublisher.class); final ContainerReportFromDatanode containerReportFromDatanode = new ContainerReportFromDatanode(datanodeOne, containerReport); reportHandler.onMessage(containerReportFromDatanode, publisher); - Assert.assertEquals( - LifeCycleState.QUASI_CLOSED, containerOne.getState()); + Assert.assertEquals(LifeCycleState.QUASI_CLOSED, containerOne.getState()); } @Test - public void testQuasiClosedWithDifferentOriginNodeReplica() + public void testQuasiClosedToClosed() throws NodeNotFoundException, IOException { /* * The container is in QUASI_CLOSED state. * - One of the replica is in QUASI_CLOSED state * - The other two replica are in OPEN/CLOSING state * - * The datanode reports the second replica is now QUASI_CLOSED. + * The datanode reports the second replica is now CLOSED. * - * In this case SCM should CLOSE the container with highest BCSID and - * send force close command to the datanode. + * In this case SCM should CLOSE the container. */ - final NodeManager nodeManager = new MockNodeManager(true, 10); - final ContainerManager containerManager = Mockito.mock( - ContainerManager.class); - final ReplicationActivityStatus replicationActivityStatus = - new ReplicationActivityStatus(scheduler); - replicationActivityStatus.enableReplication(); - final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); final ContainerReportHandler reportHandler = new ContainerReportHandler( - nodeManager, pipelineManager, containerManager, - replicationActivityStatus); + nodeManager, containerManager); final Iterator nodeIterator = nodeManager.getNodes( NodeState.HEALTHY).iterator(); + final DatanodeDetails datanodeOne = nodeIterator.next(); final DatanodeDetails datanodeTwo = nodeIterator.next(); final DatanodeDetails datanodeThree = nodeIterator.next(); + final ContainerInfo containerOne = getContainer(LifeCycleState.QUASI_CLOSED); final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED); + final Set containerIDSet = Stream.of( containerOne.containerID(), containerTwo.containerID()) .collect(Collectors.toSet()); @@ -526,139 +450,42 @@ public void testQuasiClosedWithDifferentOriginNodeReplica() nodeManager.setContainers(datanodeTwo, containerIDSet); nodeManager.setContainers(datanodeThree, containerIDSet); - addContainerToContainerManager( - containerManager, containerOne, containerOneReplicas); - addContainerToContainerManager( - containerManager, containerTwo, containerTwoReplicas); - mockUpdateContainerReplica( - containerManager, containerOne, containerOneReplicas); - mockUpdateContainerState(containerManager, containerOne, - LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED); + containerStateManager.loadContainer(containerOne); + containerStateManager.loadContainer(containerTwo); + + containerOneReplicas.forEach(r -> { + try { + containerStateManager.updateContainerReplica( + containerTwo.containerID(), r); + } catch (ContainerNotFoundException ignored) { + + } + }); + + containerTwoReplicas.forEach(r -> { + try { + containerStateManager.updateContainerReplica( + containerTwo.containerID(), r); + } catch (ContainerNotFoundException ignored) { + + } + }); + - // Container replica with datanodeOne as originNodeId is already - // QUASI_CLOSED. Now we will tell SCM that container replica from - // datanodeTwo is also QUASI_CLOSED, but has higher sequenceId. final ContainerReportsProto containerReport = getContainerReportsProto( - containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED, - datanodeTwo.getUuidString(), 999999L); - - final EventPublisher publisher = Mockito.mock(EventPublisher.class); - final ContainerReportFromDatanode containerReportFromDatanode = - new ContainerReportFromDatanode(datanodeTwo, containerReport); - reportHandler.onMessage(containerReportFromDatanode, publisher); - - // Now we should get force close container event for containerOne on - // datanodeTwo - Mockito.verify(publisher, Mockito.times(1)) - .fireEvent(Mockito.any(), Mockito.any()); - // TODO: verify whether are actually getting a force close container - // datanode command for containerOne/datanodeTwo - - // The sequence id of the container should have been updated. - Assert.assertEquals(999999L, containerOne.getSequenceId()); - - // Now datanodeTwo should close containerOne. - final ContainerReportsProto containerReportTwo = getContainerReportsProto( containerOne.containerID(), ContainerReplicaProto.State.CLOSED, - datanodeTwo.getUuidString(), 999999L); - final ContainerReportFromDatanode containerReportFromDatanodeTwo = - new ContainerReportFromDatanode(datanodeTwo, containerReportTwo); - reportHandler.onMessage(containerReportFromDatanodeTwo, publisher); - - // The container should be closed in SCM now. - Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState()); - } - - @Test - public void testQuasiClosedWithSameOriginNodeReplica() - throws NodeNotFoundException, IOException { - /* - * The container is in QUASI_CLOSED state. - * - One of the replica is in QUASI_CLOSED state - * - The other two replica are in OPEN/CLOSING state - * - * The datanode reports a QUASI_CLOSED replica which has the same - * origin node id as the existing QUASI_CLOSED replica. - * - * In this case SCM should not CLOSE the container. - */ - final NodeManager nodeManager = new MockNodeManager(true, 10); - final ContainerManager containerManager = Mockito.mock( - ContainerManager.class); - final ReplicationActivityStatus replicationActivityStatus = - new ReplicationActivityStatus(scheduler); - replicationActivityStatus.enableReplication(); - - final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); - final ContainerReportHandler reportHandler = new ContainerReportHandler( - nodeManager, pipelineManager, containerManager, - replicationActivityStatus); - final Iterator nodeIterator = nodeManager.getNodes( - NodeState.HEALTHY).iterator(); - final DatanodeDetails datanodeOne = nodeIterator.next(); - final DatanodeDetails datanodeTwo = nodeIterator.next(); - final DatanodeDetails datanodeThree = nodeIterator.next(); - final ContainerInfo containerOne = - getContainer(LifeCycleState.QUASI_CLOSED); - final ContainerInfo containerTwo = - getContainer(LifeCycleState.CLOSED); - final Set containerIDSet = Stream.of( - containerOne.containerID(), containerTwo.containerID()) - .collect(Collectors.toSet()); - final Set containerOneReplicas = getReplicas( - containerOne.containerID(), - ContainerReplicaProto.State.QUASI_CLOSED, - datanodeOne); - containerOneReplicas.addAll(getReplicas( - containerOne.containerID(), - ContainerReplicaProto.State.CLOSING, - datanodeTwo)); - final Set containerTwoReplicas = getReplicas( - containerTwo.containerID(), - ContainerReplicaProto.State.CLOSED, - datanodeOne, datanodeTwo, datanodeThree); - - nodeManager.setContainers(datanodeOne, containerIDSet); - nodeManager.setContainers(datanodeTwo, containerIDSet); - nodeManager.setContainers(datanodeThree, - Collections.singleton(containerTwo.containerID())); - - addContainerToContainerManager( - containerManager, containerOne, containerOneReplicas); - addContainerToContainerManager( - containerManager, containerTwo, containerTwoReplicas); - - mockUpdateContainerReplica( - containerManager, containerOne, containerOneReplicas); - mockUpdateContainerState(containerManager, containerOne, - LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED); - - // containerOne is QUASI_CLOSED in datanodeOne and CLOSING in datanodeTwo. - // Now datanodeThree is sending container report which says that it has - // containerOne replica, but the originNodeId of this replica is - // datanodeOne. In this case we should not force close the container even - // though we got two QUASI_CLOSED replicas. - final ContainerReportsProto containerReport = getContainerReportsProto( - containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED, datanodeOne.getUuidString()); - final EventPublisher publisher = Mockito.mock(EventPublisher.class); + final ContainerReportFromDatanode containerReportFromDatanode = - new ContainerReportFromDatanode(datanodeThree, containerReport); + new ContainerReportFromDatanode(datanodeOne, containerReport); reportHandler.onMessage(containerReportFromDatanode, publisher); - Mockito.verify(publisher, Mockito.times(0)) - .fireEvent(Mockito.any(), Mockito.any()); + Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState()); } private static ContainerReportsProto getContainerReportsProto( final ContainerID containerId, final ContainerReplicaProto.State state, final String originNodeId) { - return getContainerReportsProto(containerId, state, originNodeId, 100L); - } - - private static ContainerReportsProto getContainerReportsProto( - final ContainerID containerId, final ContainerReplicaProto.State state, - final String originNodeId, final long bcsid) { final ContainerReportsProto.Builder crBuilder = ContainerReportsProto.newBuilder(); final ContainerReplicaProto replicaProto = @@ -674,7 +501,7 @@ private static ContainerReportsProto getContainerReportsProto( .setWriteCount(100000000L) .setReadBytes(2000000000L) .setWriteBytes(2000000000L) - .setBlockCommitSequenceId(bcsid) + .setBlockCommitSequenceId(10000L) .setDeleteTransactionId(0) .build(); return crBuilder.addReports(replicaProto).build(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java deleted file mode 100644 index 860ec4d7fd..0000000000 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.hadoop.hdds.scm.container; - -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; -import org.mockito.Mockito; -import org.mockito.stubbing.Answer; - -import java.io.IOException; -import java.util.Set; - -/** - * Helper methods for testing ContainerReportHandler and - * IncrementalContainerReportHandler. - */ -public final class TestContainerReportHelper { - - private TestContainerReportHelper() {} - - static void addContainerToContainerManager( - final ContainerManager containerManager, final ContainerInfo container, - final Set replicas) throws ContainerNotFoundException { - Mockito.when(containerManager.getContainer(container.containerID())) - .thenReturn(container); - Mockito.when( - containerManager.getContainerReplicas(container.containerID())) - .thenReturn(replicas); - } - - static void mockUpdateContainerReplica( - final ContainerManager containerManager, - final ContainerInfo containerInfo, final Set replicas) - throws ContainerNotFoundException { - Mockito.doAnswer((Answer) invocation -> { - Object[] args = invocation.getArguments(); - if (args[0].equals(containerInfo.containerID())) { - ContainerReplica replica = (ContainerReplica) args[1]; - replicas.remove(replica); - replicas.add(replica); - } - return null; - }).when(containerManager).updateContainerReplica( - Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class)); - } - - static void mockUpdateContainerState( - final ContainerManager containerManager, - final ContainerInfo containerInfo, - final LifeCycleEvent event, final LifeCycleState state) - throws IOException { - Mockito.doAnswer((Answer) invocation -> { - containerInfo.setState(state); - return containerInfo.getState(); - }).when(containerManager).updateContainerState( - containerInfo.containerID(), event); - } - -} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java index 6c9383f036..7b8f9fc7b0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java @@ -17,47 +17,78 @@ */ package org.apache.hadoop.hdds.scm.container; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto; import org.apache.hadoop.hdds.scm.TestUtils; -import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .IncrementalContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; import java.io.IOException; import java.util.Set; -import static org.apache.hadoop.hdds.scm.container - .TestContainerReportHelper.addContainerToContainerManager; import static org.apache.hadoop.hdds.scm.TestUtils.getContainer; import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas; -import static org.apache.hadoop.hdds.scm.container - .TestContainerReportHelper.mockUpdateContainerReplica; -import static org.apache.hadoop.hdds.scm.container - .TestContainerReportHelper.mockUpdateContainerState; /** * Test cases to verify the functionality of IncrementalContainerReportHandler. */ public class TestIncrementalContainerReportHandler { + private ContainerManager containerManager; + private ContainerStateManager containerStateManager; + private EventPublisher publisher; + + @Before + public void setup() throws IOException { + final Configuration conf = new OzoneConfiguration(); + this.containerManager = Mockito.mock(ContainerManager.class); + this.containerStateManager = new ContainerStateManager(conf); + this.publisher = Mockito.mock(EventPublisher.class); + + + Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class))) + .thenAnswer(invocation -> containerStateManager + .getContainer((ContainerID)invocation.getArguments()[0])); + + Mockito.when(containerManager.getContainerReplicas( + Mockito.any(ContainerID.class))) + .thenAnswer(invocation -> containerStateManager + .getContainerReplicas((ContainerID)invocation.getArguments()[0])); + + Mockito.doAnswer(invocation -> { + containerStateManager + .updateContainerState((ContainerID)invocation.getArguments()[0], + (HddsProtos.LifeCycleEvent)invocation.getArguments()[1]); + return null; + }).when(containerManager).updateContainerState( + Mockito.any(ContainerID.class), + Mockito.any(HddsProtos.LifeCycleEvent.class)); + + } + + @After + public void tearDown() throws IOException { + containerStateManager.close(); + } + + @Test public void testClosingToClosed() throws IOException { - final ContainerManager containerManager = Mockito.mock( - ContainerManager.class); - final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); final IncrementalContainerReportHandler reportHandler = - new IncrementalContainerReportHandler( - pipelineManager, containerManager); + new IncrementalContainerReportHandler(containerManager); final ContainerInfo container = getContainer(LifeCycleState.CLOSING); final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails(); final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails(); @@ -67,34 +98,31 @@ public void testClosingToClosed() throws IOException { ContainerReplicaProto.State.CLOSING, datanodeOne, datanodeTwo, datanodeThree); - addContainerToContainerManager( - containerManager, container, containerReplicas); - mockUpdateContainerReplica( - containerManager, container, containerReplicas); - mockUpdateContainerState(containerManager, container, - LifeCycleEvent.CLOSE, LifeCycleState.CLOSED); + containerStateManager.loadContainer(container); + containerReplicas.forEach(r -> { + try { + containerStateManager.updateContainerReplica( + container.containerID(), r); + } catch (ContainerNotFoundException ignored) { + + } + }); final IncrementalContainerReportProto containerReport = getIncrementalContainerReportProto(container.containerID(), ContainerReplicaProto.State.CLOSED, datanodeOne.getUuidString()); - final EventPublisher publisher = Mockito.mock(EventPublisher.class); final IncrementalContainerReportFromDatanode icrFromDatanode = new IncrementalContainerReportFromDatanode( datanodeOne, containerReport); reportHandler.onMessage(icrFromDatanode, publisher); - Assert.assertEquals( - LifeCycleState.CLOSED, container.getState()); + Assert.assertEquals(LifeCycleState.CLOSED, container.getState()); } @Test public void testClosingToQuasiClosed() throws IOException { - final ContainerManager containerManager = Mockito.mock( - ContainerManager.class); - final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); final IncrementalContainerReportHandler reportHandler = - new IncrementalContainerReportHandler( - pipelineManager, containerManager); + new IncrementalContainerReportHandler(containerManager); final ContainerInfo container = getContainer(LifeCycleState.CLOSING); final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails(); final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails(); @@ -104,34 +132,32 @@ public void testClosingToQuasiClosed() throws IOException { ContainerReplicaProto.State.CLOSING, datanodeOne, datanodeTwo, datanodeThree); - addContainerToContainerManager( - containerManager, container, containerReplicas); - mockUpdateContainerReplica( - containerManager, container, containerReplicas); - mockUpdateContainerState(containerManager, container, - LifeCycleEvent.QUASI_CLOSE, LifeCycleState.QUASI_CLOSED); + containerStateManager.loadContainer(container); + containerReplicas.forEach(r -> { + try { + containerStateManager.updateContainerReplica( + container.containerID(), r); + } catch (ContainerNotFoundException ignored) { + + } + }); + final IncrementalContainerReportProto containerReport = getIncrementalContainerReportProto(container.containerID(), ContainerReplicaProto.State.QUASI_CLOSED, datanodeOne.getUuidString()); - final EventPublisher publisher = Mockito.mock(EventPublisher.class); final IncrementalContainerReportFromDatanode icrFromDatanode = new IncrementalContainerReportFromDatanode( datanodeOne, containerReport); reportHandler.onMessage(icrFromDatanode, publisher); - Assert.assertEquals( - LifeCycleState.QUASI_CLOSED, container.getState()); + Assert.assertEquals(LifeCycleState.QUASI_CLOSED, container.getState()); } @Test public void testQuasiClosedToClosed() throws IOException { - final ContainerManager containerManager = Mockito.mock( - ContainerManager.class); - final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); final IncrementalContainerReportHandler reportHandler = - new IncrementalContainerReportHandler( - pipelineManager, containerManager); + new IncrementalContainerReportHandler(containerManager); final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED); final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails(); final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails(); @@ -145,38 +171,25 @@ public void testQuasiClosedToClosed() throws IOException { ContainerReplicaProto.State.QUASI_CLOSED, datanodeThree)); + containerStateManager.loadContainer(container); + containerReplicas.forEach(r -> { + try { + containerStateManager.updateContainerReplica( + container.containerID(), r); + } catch (ContainerNotFoundException ignored) { - addContainerToContainerManager( - containerManager, container, containerReplicas); - mockUpdateContainerReplica( - containerManager, container, containerReplicas); - mockUpdateContainerState(containerManager, container, - LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED); + } + }); final IncrementalContainerReportProto containerReport = getIncrementalContainerReportProto(container.containerID(), - ContainerReplicaProto.State.QUASI_CLOSED, - datanodeOne.getUuidString(), 999999L); - final EventPublisher publisher = Mockito.mock(EventPublisher.class); - final IncrementalContainerReportFromDatanode icrFromDatanode = + ContainerReplicaProto.State.CLOSED, + datanodeThree.getUuidString()); + final IncrementalContainerReportFromDatanode icr = new IncrementalContainerReportFromDatanode( datanodeOne, containerReport); - reportHandler.onMessage(icrFromDatanode, publisher); - - // SCM should issue force close. - Mockito.verify(publisher, Mockito.times(1)) - .fireEvent(Mockito.any(), Mockito.any()); - - final IncrementalContainerReportProto containerReportTwo = - getIncrementalContainerReportProto(container.containerID(), - ContainerReplicaProto.State.CLOSED, - datanodeOne.getUuidString(), 999999L); - final IncrementalContainerReportFromDatanode icrTwoFromDatanode = - new IncrementalContainerReportFromDatanode( - datanodeOne, containerReportTwo); - reportHandler.onMessage(icrTwoFromDatanode, publisher); - Assert.assertEquals( - LifeCycleState.CLOSED, container.getState()); + reportHandler.onMessage(icr, publisher); + Assert.assertEquals(LifeCycleState.CLOSED, container.getState()); } private static IncrementalContainerReportProto @@ -184,15 +197,6 @@ public void testQuasiClosedToClosed() throws IOException { final ContainerID containerId, final ContainerReplicaProto.State state, final String originNodeId) { - return getIncrementalContainerReportProto( - containerId, state, originNodeId, 100L); - } - - private static IncrementalContainerReportProto - getIncrementalContainerReportProto( - final ContainerID containerId, - final ContainerReplicaProto.State state, - final String originNodeId, final long bcsid) { final IncrementalContainerReportProto.Builder crBuilder = IncrementalContainerReportProto.newBuilder(); final ContainerReplicaProto replicaProto = @@ -208,7 +212,7 @@ public void testQuasiClosedToClosed() throws IOException { .setWriteCount(100000000L) .setReadBytes(2000000000L) .setWriteBytes(2000000000L) - .setBlockCommitSequenceId(bcsid) + .setBlockCommitSequenceId(10000L) .setDeleteTransactionId(0) .build(); return crBuilder.addReport(replicaProto).build(); diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationActivityStatus.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationActivityStatus.java deleted file mode 100644 index c36ba75079..0000000000 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationActivityStatus.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements.  See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership.  The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License.  You may obtain a copy of the License at - * - *      http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.container.replication; - -import static org.junit.Assert.*; - -import java.util.concurrent.TimeoutException; - -import org.apache.hadoop.hdds.HddsConfigKeys; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.scm.block.BlockManager; -import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; -import org.apache.hadoop.hdds.scm.chillmode.ChillModeHandler; -import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager; -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer; -import org.apache.hadoop.hdds.server.events.EventQueue; -import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.utils.Scheduler; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Mockito; - -/** - * Tests for ReplicationActivityStatus. - */ -public class TestReplicationActivityStatus { - - private static EventQueue eventQueue; - private static ReplicationActivityStatus replicationActivityStatus; - - @BeforeClass - public static void setup() { - eventQueue = new EventQueue(); - replicationActivityStatus = new ReplicationActivityStatus( - new Scheduler("SCMCommonScheduler", false, 1)); - - OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); - ozoneConfiguration.set(HddsConfigKeys. - HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT, "3s"); - - SCMClientProtocolServer scmClientProtocolServer = - Mockito.mock(SCMClientProtocolServer.class); - BlockManager blockManager = Mockito.mock(BlockManagerImpl.class); - ChillModeHandler chillModeHandler = - new ChillModeHandler(ozoneConfiguration, scmClientProtocolServer, - blockManager, replicationActivityStatus); - eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler); - - } - - @Test - public void testReplicationStatusForChillMode() - throws TimeoutException, InterruptedException { - assertFalse(replicationActivityStatus.isReplicationEnabled()); - // In chill mode replication process should be stopped. - eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, - new SCMChillModeManager.ChillModeStatus(true)); - assertFalse(replicationActivityStatus.isReplicationEnabled()); - - // Replication should be enabled when chill mode if off. - eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, - new SCMChillModeManager.ChillModeStatus(false)); - GenericTestUtils.waitFor(() -> { - return replicationActivityStatus.isReplicationEnabled(); - }, 10, 1000*5); - assertTrue(replicationActivityStatus.isReplicationEnabled()); - } -} \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java index 1846b0c4b8..ea59af3d7f 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMClientProtocolServer.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; import org.apache.hadoop.hdds.scm.chillmode.ChillModeHandler; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus; +import org.apache.hadoop.hdds.scm.container.ReplicationManager; import org.apache.hadoop.hdds.scm.events.SCMEvents; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.server.events.EventQueue; @@ -48,10 +48,10 @@ public void setUp() throws Exception { eventQueue = new EventQueue(); scmClientProtocolServer = new SCMClientProtocolServer(config, null); BlockManager blockManager = Mockito.mock(BlockManagerImpl.class); - ReplicationActivityStatus replicationActivityStatus = - Mockito.mock(ReplicationActivityStatus.class); + ReplicationManager replicationManager = + Mockito.mock(ReplicationManager.class); ChillModeHandler chillModeHandler = new ChillModeHandler(config, - scmClientProtocolServer, blockManager, replicationActivityStatus); + scmClientProtocolServer, blockManager, replicationManager); eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler); }