diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java index 1edd9732ef..0f4d3c5a10 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java @@ -70,6 +70,10 @@ public class ContainerInfo implements Comparator, private String owner; private long containerID; private long deleteTransactionId; + // The sequenceId of a close container cannot change, and all the + // container replica should have the same sequenceId. + private long sequenceId; + /** * Allows you to maintain private data on ContainerInfo. This is not * serialized via protobuf, just allows us to maintain some private data. @@ -86,6 +90,7 @@ public class ContainerInfo implements Comparator, long stateEnterTime, String owner, long deleteTransactionId, + long sequenceId, ReplicationFactor replicationFactor, ReplicationType repType) { this.containerID = containerID; @@ -97,6 +102,7 @@ public class ContainerInfo implements Comparator, this.stateEnterTime = stateEnterTime; this.owner = owner; this.deleteTransactionId = deleteTransactionId; + this.sequenceId = sequenceId; this.replicationFactor = replicationFactor; this.replicationType = repType; } @@ -105,8 +111,8 @@ public ContainerInfo(ContainerInfo info) { this(info.getContainerID(), info.getState(), info.getPipelineID(), info.getUsedBytes(), info.getNumberOfKeys(), info.getStateEnterTime(), info.getOwner(), - info.getDeleteTransactionId(), info.getReplicationFactor(), - info.getReplicationType()); + info.getDeleteTransactionId(), info.getSequenceId(), + info.getReplicationFactor(), info.getReplicationType()); } /** * Needed for serialization findbugs. @@ -174,10 +180,19 @@ public long getDeleteTransactionId() { return deleteTransactionId; } + public long getSequenceId() { + return sequenceId; + } + public void updateDeleteTransactionId(long transactionId) { deleteTransactionId = max(transactionId, deleteTransactionId); } + public void updateSequenceId(long sequenceID) { + assert (isOpen() || state == HddsProtos.LifeCycleState.QUASI_CLOSED); + sequenceId = max(sequenceID, sequenceId); + } + public ContainerID containerID() { return new ContainerID(getContainerID()); } @@ -380,6 +395,7 @@ public static class Builder { private String owner; private long containerID; private long deleteTransactionId; + private long sequenceId; private PipelineID pipelineID; private ReplicationFactor replicationFactor; private ReplicationType replicationType; @@ -436,10 +452,15 @@ public Builder setDeleteTransactionId(long deleteTransactionID) { return this; } + public Builder setSequenceId(long sequenceID) { + this.sequenceId = sequenceID; + return this; + } + public ContainerInfo build() { return new ContainerInfo(containerID, state, pipelineID, - used, keys, stateEnterTime, owner, deleteTransactionId, - replicationFactor, replicationType); + used, keys, stateEnterTime, owner, deleteTransactionId, + sequenceId, replicationFactor, replicationType); } } diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto index c37683a9cc..cf3d6d4519 100644 --- a/hadoop-hdds/common/src/main/proto/hdds.proto +++ b/hadoop-hdds/common/src/main/proto/hdds.proto @@ -113,16 +113,19 @@ message NodePool { enum LifeCycleState { OPEN = 1; CLOSING = 2; - CLOSED = 3; - DELETING = 4; - DELETED = 5; // object is deleted. + QUASI_CLOSED = 3; + CLOSED = 4; + DELETING = 5; + DELETED = 6; // object is deleted. } enum LifeCycleEvent { FINALIZE = 1; - CLOSE = 2; // !!Event after this has not been used yet. - DELETE = 3; - CLEANUP = 4; + QUASI_CLOSE = 2; + CLOSE = 3; // !!Event after this has not been used yet. + FORCE_CLOSE = 4; + DELETE = 5; + CLEANUP = 6; } message ContainerInfoProto { @@ -134,8 +137,9 @@ message ContainerInfoProto { optional int64 stateEnterTime = 6; required string owner = 7; optional int64 deleteTransactionId = 8; - required ReplicationFactor replicationFactor = 9; - required ReplicationType replicationType = 10; + optional int64 sequenceId = 9; + required ReplicationFactor replicationFactor = 10; + required ReplicationType replicationType = 11; } message ContainerWithPipeline { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java index c9fe9e4419..0170caae86 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReportHandler.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdds.scm.container; +import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -154,10 +155,13 @@ private void processContainerReplicas(final DatanodeDetails datanodeDetails, try { final ContainerID containerID = ContainerID.valueof( replicaProto.getContainerID()); + + ReportHandlerHelper.processContainerReplica(containerManager, + containerID, replicaProto, datanodeDetails, publisher, LOG); + final ContainerInfo containerInfo = containerManager .getContainer(containerID); - updateContainerState(datanodeDetails, containerInfo, - replicaProto, publisher); + if (containerInfo.getDeleteTransactionId() > replicaProto.getDeleteTransactionId()) { pendingDeleteStatusList @@ -166,8 +170,12 @@ private void processContainerReplicas(final DatanodeDetails datanodeDetails, containerInfo.getContainerID()); } } catch (ContainerNotFoundException e) { - LOG.error("Received container report for an unknown container {}", - replicaProto.getContainerID()); + LOG.error("Received container report for an unknown container {} from" + + " datanode {}", replicaProto.getContainerID(), datanodeDetails); + } catch (IOException e) { + LOG.error("Exception while processing container report for container" + + " {} from datanode {}", + replicaProto.getContainerID(), datanodeDetails); } } if (pendingDeleteStatusList.getNumPendingDeletes() > 0) { @@ -176,40 +184,6 @@ private void processContainerReplicas(final DatanodeDetails datanodeDetails, } } - private void updateContainerState(final DatanodeDetails datanodeDetails, - final ContainerInfo containerInfo, - final ContainerReplicaProto replicaProto, - final EventPublisher publisher) - throws ContainerNotFoundException { - - final ContainerID id = containerInfo.containerID(); - final ContainerReplica datanodeContainerReplica = ContainerReplica - .newBuilder() - .setContainerID(id) - .setContainerState(replicaProto.getState()) - .setDatanodeDetails(datanodeDetails) - .build(); - // TODO: Add bcsid and origin datanode to replica. - - final ContainerReplica scmContainerReplica = containerManager - .getContainerReplicas(id) - .stream() - .filter(replica -> - replica.getDatanodeDetails().equals(datanodeDetails)) - .findFirst().orElse(null); - - // This is an in-memory update. - containerManager.updateContainerReplica(id, datanodeContainerReplica); - containerInfo.setUsedBytes(replicaProto.getUsed()); - containerInfo.setNumberOfKeys(replicaProto.getKeyCount()); - - // Check if there is state change in container replica. - if (scmContainerReplica == null || - scmContainerReplica.getState() != datanodeContainerReplica.getState()) { - //TODO: Handler replica state change. - } - } - private void checkReplicationState(ContainerID containerID, EventPublisher publisher) { try { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java index f4bb082b1e..870eeb2d45 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java @@ -156,40 +156,64 @@ public ContainerStateManager(final Configuration configuration) { * * Event and State Transition Mapping: * - * State: OPEN ---------------> CLOSING - * Event: FINALIZE + * State: OPEN ----------------> CLOSING + * Event: FINALIZE * - * State: CLOSING ---------------> CLOSED - * Event: CLOSE + * State: CLOSING ----------------> QUASI_CLOSED + * Event: QUASI_CLOSE * - * State: CLOSED ----------------> DELETING - * Event: DELETE + * State: CLOSING ----------------> CLOSED + * Event: CLOSE * - * State: DELETING ----------------> DELETED - * Event: CLEANUP + * State: QUASI_CLOSED ----------------> CLOSED + * Event: FORCE_CLOSE + * + * State: CLOSED ----------------> DELETING + * Event: DELETE + * + * State: DELETING ----------------> DELETED + * Event: CLEANUP * * * Container State Flow: * - * [OPEN]-------->[CLOSING]------->[CLOSED] - * (FINALIZE) (CLOSE) | - * | - * | - * (DELETE)| - * | - * | - * [DELETING] ----------> [DELETED] - * (CLEANUP) + * [OPEN]--------------->[CLOSING]--------------->[QUASI_CLOSED] + * (FINALIZE) | (QUASI_CLOSE) | + * | | + * | | + * (CLOSE) | (FORCE_CLOSE) | + * | | + * | | + * +--------->[CLOSED]<--------+ + * | + * (DELETE)| + * | + * | + * [DELETING] + * | + * (CLEANUP) | + * | + * V + * [DELETED] + * */ private void initializeStateMachine() { stateMachine.addTransition(LifeCycleState.OPEN, LifeCycleState.CLOSING, LifeCycleEvent.FINALIZE); + stateMachine.addTransition(LifeCycleState.CLOSING, + LifeCycleState.QUASI_CLOSED, + LifeCycleEvent.QUASI_CLOSE); + stateMachine.addTransition(LifeCycleState.CLOSING, LifeCycleState.CLOSED, LifeCycleEvent.CLOSE); + stateMachine.addTransition(LifeCycleState.QUASI_CLOSED, + LifeCycleState.CLOSED, + LifeCycleEvent.FORCE_CLOSE); + stateMachine.addTransition(LifeCycleState.CLOSED, LifeCycleState.DELETING, LifeCycleEvent.DELETE); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java index e07ee73d4f..d70edfbe3a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/IncrementalContainerReportHandler.java @@ -20,7 +20,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; @@ -66,26 +65,8 @@ public void onMessage( .getDatanodeDetails(); final ContainerID containerID = ContainerID .valueof(replicaProto.getContainerID()); - final ContainerInfo containerInfo = containerManager - .getContainer(containerID); - - ContainerReplica replica = ContainerReplica.newBuilder() - .setContainerID(ContainerID.valueof(replicaProto.getContainerID())) - .setContainerState(replicaProto.getState()) - .setDatanodeDetails(datanodeDetails) - .build(); - - containerManager.updateContainerReplica(containerID, replica); - - // Check if the state of the container is changed. - if (replicaProto.getState() == ContainerReplicaProto.State.CLOSED && - containerInfo.getState() == HddsProtos.LifeCycleState.CLOSING) { - containerManager.updateContainerState(containerID, - HddsProtos.LifeCycleEvent.CLOSE); - } - - // TODO: Handler replica state change - + ReportHandlerHelper.processContainerReplica(containerManager, + containerID, replicaProto, datanodeDetails, publisher, LOG); } catch (ContainerNotFoundException e) { LOG.warn("Container {} not found!", replicaProto.getContainerID()); } catch (IOException e) { @@ -95,4 +76,5 @@ public void onMessage( } } + } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java new file mode 100644 index 0000000000..d9c30909f1 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java @@ -0,0 +1,368 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND; + +/** + * Helper functions to handler container reports. + */ +public final class ReportHandlerHelper { + + private ReportHandlerHelper() {} + + /** + * Processes the container replica and updates the container state in SCM. + * If needed, sends command to datanode to update the replica state. + * + * @param containerManager ContainerManager instance + * @param containerId Id of the container + * @param replicaProto replica of the container + * @param datanodeDetails datanode where the replica resides + * @param publisher event publisher + * @param logger for logging + * @throws IOException + */ + static void processContainerReplica(final ContainerManager containerManager, + final ContainerID containerId, final ContainerReplicaProto replicaProto, + final DatanodeDetails datanodeDetails, final EventPublisher publisher, + final Logger logger) throws IOException { + + final ContainerReplica replica = ContainerReplica.newBuilder() + .setContainerID(containerId) + .setContainerState(replicaProto.getState()) + .setDatanodeDetails(datanodeDetails) + .setOriginNodeId(UUID.fromString(replicaProto.getOriginNodeId())) + .setSequenceId(replicaProto.getBlockCommitSequenceId()) + .build(); + + // This is an in-memory update. + containerManager.updateContainerReplica(containerId, replica); + ReportHandlerHelper.reconcileContainerState(containerManager, + containerId, publisher, logger); + + final ContainerInfo containerInfo = containerManager + .getContainer(containerId); + if (containerInfo.getUsedBytes() < replicaProto.getUsed()) { + containerInfo.setUsedBytes(replicaProto.getUsed()); + } + + if (containerInfo.getNumberOfKeys() < replicaProto.getKeyCount()) { + containerInfo.setNumberOfKeys(replicaProto.getKeyCount()); + } + + // Now we have reconciled the container state. If the container state and + // the replica state doesn't match, then take appropriate action. + ReportHandlerHelper.sendReplicaCommands( + datanodeDetails, containerInfo, replica, publisher, logger); + } + + + /** + * Reconcile the container state based on the ContainerReplica states. + * ContainerState is updated after the reconciliation. + * + * @param manager ContainerManager + * @param containerId container id + * @throws ContainerNotFoundException + */ + private static void reconcileContainerState(final ContainerManager manager, + final ContainerID containerId, final EventPublisher publisher, + final Logger logger) throws IOException { + // TODO: handle unhealthy replica. + synchronized (manager.getContainer(containerId)) { + final ContainerInfo container = manager.getContainer(containerId); + final Set replicas = manager.getContainerReplicas( + containerId); + final LifeCycleState containerState = container.getState(); + switch (containerState) { + case OPEN: + /* + * If the container state is OPEN. + * None of the replica should be in any other state. + * + */ + List invalidReplicas = replicas.stream() + .filter(replica -> replica.getState() != State.OPEN) + .collect(Collectors.toList()); + if (!invalidReplicas.isEmpty()) { + logger.warn("Container {} has invalid replica state." + + "Invalid Replicas: {}", containerId, invalidReplicas); + } + // A container cannot be over replicated when in OPEN state. + break; + case CLOSING: + /* + * SCM has asked DataNodes to close the container. Now the replicas + * can be in any of the following states. + * + * 1. OPEN + * 2. CLOSING + * 3. QUASI_CLOSED + * 4. CLOSED + * + * If all the replica are either in OPEN or CLOSING state, do nothing. + * + * If any one of the replica is in QUASI_CLOSED state, move the + * container to QUASI_CLOSED state. + * + * If any one of the replica is in CLOSED state, mark the container as + * CLOSED. The close has happened via Ratis. + * + */ + Optional closedReplica = replicas.stream() + .filter(replica -> replica.getState() == State.CLOSED) + .findFirst(); + if (closedReplica.isPresent()) { + container.updateSequenceId(closedReplica.get().getSequenceId()); + manager.updateContainerState( + containerId, HddsProtos.LifeCycleEvent.CLOSE); + + // TODO: remove container from OPEN pipeline, since the container is + // closed we can go ahead and remove it from Ratis pipeline. + } else if (replicas.stream() + .anyMatch(replica -> replica.getState() == State.QUASI_CLOSED)) { + manager.updateContainerState( + containerId, HddsProtos.LifeCycleEvent.QUASI_CLOSE); + } + break; + case QUASI_CLOSED: + /* + * The container is in QUASI_CLOSED state, this means that at least + * one of the replica is in QUASI_CLOSED/CLOSED state. + * Other replicas can be in any of the following state. + * + * 1. OPEN + * 2. CLOSING + * 3. QUASI_CLOSED + * 4. CLOSED + * + * If <50% of container replicas are in QUASI_CLOSED state and all + * the other replica are either in OPEN or CLOSING state, do nothing. + * We cannot identify the correct replica since we don't have quorum + * yet. + * + * If >50% (quorum) of replicas are in QUASI_CLOSED state and other + * replicas are either in OPEN or CLOSING state, try to identify + * the latest container replica using originNodeId and sequenceId. + * Force close those replica(s) which have the latest sequenceId. + * + * If at least one of the replica is in CLOSED state, mark the + * container as CLOSED. Force close the replicas which matches the + * sequenceId of the CLOSED replica. + * + */ + if (replicas.stream() + .anyMatch(replica -> replica.getState() == State.CLOSED)) { + manager.updateContainerState( + containerId, HddsProtos.LifeCycleEvent.FORCE_CLOSE); + // TODO: remove container from OPEN pipeline, since the container is + // closed we can go ahead and remove it from Ratis pipeline. + } else { + final int replicationFactor = container + .getReplicationFactor().getNumber(); + final List quasiClosedReplicas = replicas.stream() + .filter(replica -> replica.getState() == State.QUASI_CLOSED) + .collect(Collectors.toList()); + final long uniqueQuasiClosedReplicaCount = quasiClosedReplicas + .stream() + .map(ContainerReplica::getOriginDatanodeId) + .distinct() + .count(); + + float quasiClosePercent = ((float) uniqueQuasiClosedReplicaCount) / + ((float) replicationFactor); + + if (quasiClosePercent > 0.5F) { + // Quorum of unique replica has been QUASI_CLOSED + long sequenceId = forceCloseContainerReplicaWithHighestSequenceId( + container, quasiClosedReplicas, publisher); + if (sequenceId != -1L) { + container.updateSequenceId(sequenceId); + } + } + } + break; + case CLOSED: + /* + * The container is already in closed state. do nothing. + */ + break; + case DELETING: + // Not handled. + throw new UnsupportedOperationException("Unsupported container state" + + " 'DELETING'."); + case DELETED: + // Not handled. + throw new UnsupportedOperationException("Unsupported container state" + + " 'DELETED'."); + default: + break; + } + } + } + + /** + * Compares the QUASI_CLOSED replicas of a container and sends close command. + * + * @param quasiClosedReplicas list of quasi closed replicas + * @return the sequenceId of the closed replica. + */ + private static long forceCloseContainerReplicaWithHighestSequenceId( + final ContainerInfo container, + final List quasiClosedReplicas, + final EventPublisher publisher) { + + final long highestSequenceId = quasiClosedReplicas.stream() + .map(ContainerReplica::getSequenceId) + .max(Long::compare) + .orElse(-1L); + + if (highestSequenceId != -1L) { + quasiClosedReplicas.stream() + .filter(replica -> replica.getSequenceId() == highestSequenceId) + .forEach(replica -> { + CloseContainerCommand closeContainerCommand = + new CloseContainerCommand(container.getContainerID(), + container.getPipelineID(), true); + publisher.fireEvent(DATANODE_COMMAND, + new CommandForDatanode<>( + replica.getDatanodeDetails().getUuid(), + closeContainerCommand)); + }); + } + return highestSequenceId; + } + + /** + * Based on the container and replica state, send command to datanode if + * required. + * + * @param datanodeDetails datanode where the replica resides + * @param containerInfo container information + * @param replica replica information + * @param publisher queue to publish the datanode command event + * @param log for logging + */ + static void sendReplicaCommands( + final DatanodeDetails datanodeDetails, + final ContainerInfo containerInfo, + final ContainerReplica replica, + final EventPublisher publisher, + final Logger log) { + final HddsProtos.LifeCycleState containerState = containerInfo.getState(); + final ContainerReplicaProto.State replicaState = replica.getState(); + + if(!ReportHandlerHelper.compareState(containerState, replicaState)) { + if (containerState == HddsProtos.LifeCycleState.OPEN) { + // When a container state in SCM is OPEN, there is no way a datanode + // can quasi close/close the container. + log.warn("Invalid container replica state for container {}" + + " from datanode {}. Expected state is OPEN.", + containerInfo.containerID(), datanodeDetails); + // The replica can go CORRUPT, we have to handle it. + } + if (containerState == HddsProtos.LifeCycleState.CLOSING || + containerState == HddsProtos.LifeCycleState.QUASI_CLOSED) { + // Resend close container event for this datanode if the container + // replica state is OPEN/CLOSING. + if (replicaState == ContainerReplicaProto.State.OPEN || + replicaState == ContainerReplicaProto.State.CLOSING) { + CloseContainerCommand closeContainerCommand = + new CloseContainerCommand(containerInfo.getContainerID(), + containerInfo.getPipelineID()); + publisher.fireEvent(DATANODE_COMMAND, + new CommandForDatanode<>( + replica.getDatanodeDetails().getUuid(), + closeContainerCommand)); + } + } + if (containerState == HddsProtos.LifeCycleState.CLOSED) { + if (replicaState == ContainerReplicaProto.State.OPEN || + replicaState == ContainerReplicaProto.State.CLOSING || + replicaState == ContainerReplicaProto.State.QUASI_CLOSED) { + // Send force close container event for this datanode if the container + // replica state is OPEN/CLOSING/QUASI_CLOSED. + + // Close command will be send only if this replica matches the + // sequence of the container. + if (containerInfo.getSequenceId() == + replica.getSequenceId()) { + CloseContainerCommand closeContainerCommand = + new CloseContainerCommand(containerInfo.getContainerID(), + containerInfo.getPipelineID(), true); + publisher.fireEvent(DATANODE_COMMAND, + new CommandForDatanode<>( + replica.getDatanodeDetails().getUuid(), + closeContainerCommand)); + } + // TODO: delete the replica if the BCSID doesn't match. + } + } + } + + } + + /** + * Compares the container and replica state. + * + * @param containerState container state + * @param replicaState replica state + * @return true if the states are same, else false + */ + private static boolean compareState(final LifeCycleState containerState, + final State replicaState) { + // TODO: handle unhealthy replica. + switch (containerState) { + case OPEN: + return replicaState == State.OPEN; + case CLOSING: + return replicaState == State.CLOSING; + case QUASI_CLOSED: + return replicaState == State.QUASI_CLOSED; + case CLOSED: + return replicaState == State.CLOSED; + case DELETING: + return false; + case DELETED: + return false; + default: + return false; + } + } + +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 69cc21578d..33fa1fac87 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -94,6 +94,7 @@ public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { if (initializeFakeNodes) { for (int x = 0; x < nodeCount; x++) { DatanodeDetails dd = TestUtils.randomDatanodeDetails(); + register(dd, null, null); populateNodeMetric(dd, x); } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java index 14f516dd2e..4981c4bba0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java @@ -16,208 +16,654 @@ */ package org.apache.hadoop.hdds.scm.container; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; - -import org.apache.hadoop.hdds.HddsConfigKeys; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; -import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; import org.apache.hadoop.hdds.scm.container.replication .ReplicationActivityStatus; -import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; -import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager; -import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher - .ContainerReportFromDatanode; -import org.apache.hadoop.hdds.server.events.Event; +import org.apache.hadoop.hdds.scm.server + .SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode; import org.apache.hadoop.hdds.server.events.EventPublisher; - -import org.apache.hadoop.hdds.server.events.EventQueue; -import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.scm.container + .TestContainerReportHelper.getReplicas; +import static org.apache.hadoop.hdds.scm.container + .TestContainerReportHelper.getContainer; +import static org.apache.hadoop.hdds.scm.container + .TestContainerReportHelper.addContainerToContainerManager; +import static org.apache.hadoop.hdds.scm.container + .TestContainerReportHelper.mockUpdateContainerReplica; +import static org.apache.hadoop.hdds.scm.container + .TestContainerReportHelper.mockUpdateContainerState; /** * Test the behaviour of the ContainerReportHandler. */ -public class TestContainerReportHandler implements EventPublisher { +public class TestContainerReportHandler { - private List publishedEvents = new ArrayList<>(); - private final NodeManager nodeManager = new MockNodeManager(true, 15); - private static final Logger LOG = - LoggerFactory.getLogger(TestContainerReportHandler.class); - - @Before - public void resetEventCollector() { - publishedEvents.clear(); - } - - //TODO: Rewrite it - @Ignore @Test - public void test() throws IOException, NodeNotFoundException { - String testDir = GenericTestUtils.getTempPath( - this.getClass().getSimpleName()); - //GIVEN - OzoneConfiguration conf = new OzoneConfiguration(); - conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir); - EventQueue eventQueue = new EventQueue(); - PipelineManager pipelineManager = - new SCMPipelineManager(conf, nodeManager, eventQueue); - SCMContainerManager containerManager = new SCMContainerManager( - conf, nodeManager, pipelineManager, eventQueue); + public void testUnderReplicatedContainer() + throws NodeNotFoundException, ContainerNotFoundException, + ContainerReplicaNotFoundException { - ReplicationActivityStatus replicationActivityStatus = + final NodeManager nodeManager = new MockNodeManager(true, 10); + final ContainerManager containerManager = Mockito.mock( + ContainerManager.class); + final ReplicationActivityStatus replicationActivityStatus = new ReplicationActivityStatus(); - - ContainerReportHandler reportHandler = - new ContainerReportHandler(nodeManager, pipelineManager, - containerManager, replicationActivityStatus); - - DatanodeDetails dn1 = TestUtils.randomDatanodeDetails(); - DatanodeDetails dn2 = TestUtils.randomDatanodeDetails(); - DatanodeDetails dn3 = TestUtils.randomDatanodeDetails(); - DatanodeDetails dn4 = TestUtils.randomDatanodeDetails(); - nodeManager.setContainers(dn1, new HashSet<>()); - nodeManager.setContainers(dn2, new HashSet<>()); - nodeManager.setContainers(dn3, new HashSet<>()); - nodeManager.setContainers(dn4, new HashSet<>()); - - ContainerInfo cont1 = containerManager - .allocateContainer(ReplicationType.STAND_ALONE, - ReplicationFactor.THREE, "root"); - ContainerInfo cont2 = containerManager - .allocateContainer(ReplicationType.STAND_ALONE, - ReplicationFactor.THREE, "root"); - // Open Container - ContainerInfo cont3 = containerManager - .allocateContainer(ReplicationType.STAND_ALONE, - ReplicationFactor.THREE, "root"); - - long c1 = cont1.getContainerID(); - long c2 = cont2.getContainerID(); - long c3 = cont3.getContainerID(); - - // Close remaining containers - TestUtils.closeContainer(containerManager, cont1.containerID()); - TestUtils.closeContainer(containerManager, cont2.containerID()); - - //when - - //initial reports before replication is enabled. 2 containers w 3 replicas. - reportHandler.onMessage( - new ContainerReportFromDatanode(dn1, - createContainerReport(new long[] {c1, c2, c3})), this); - - reportHandler.onMessage( - new ContainerReportFromDatanode(dn2, - createContainerReport(new long[] {c1, c2, c3})), this); - - reportHandler.onMessage( - new ContainerReportFromDatanode(dn3, - createContainerReport(new long[] {c1, c2})), this); - - reportHandler.onMessage( - new ContainerReportFromDatanode(dn4, - createContainerReport(new long[] {})), this); - - Assert.assertEquals(0, publishedEvents.size()); - replicationActivityStatus.enableReplication(); - //no problem here - reportHandler.onMessage( - new ContainerReportFromDatanode(dn1, - createContainerReport(new long[] {c1, c2})), this); + final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); + final ContainerReportHandler reportHandler = new ContainerReportHandler( + nodeManager, pipelineManager, containerManager, + replicationActivityStatus); + final Iterator nodeIterator = nodeManager.getNodes( + NodeState.HEALTHY).iterator(); + final DatanodeDetails datanodeOne = nodeIterator.next(); + final DatanodeDetails datanodeTwo = nodeIterator.next(); + final DatanodeDetails datanodeThree = nodeIterator.next(); + final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED); + final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED); + final Set containerIDSet = Stream.of( + containerOne.containerID(), containerTwo.containerID()) + .collect(Collectors.toSet()); + final Set containerOneReplicas = getReplicas( + containerOne.containerID(), ContainerReplicaProto.State.CLOSED, + datanodeOne, datanodeTwo, datanodeThree); + final Set containerTwoReplicas = getReplicas( + containerTwo.containerID(), ContainerReplicaProto.State.CLOSED, + datanodeOne, datanodeTwo, datanodeThree); - Assert.assertEquals(0, publishedEvents.size()); + nodeManager.setContainers(datanodeOne, containerIDSet); + nodeManager.setContainers(datanodeTwo, containerIDSet); + nodeManager.setContainers(datanodeThree, containerIDSet); - //container is missing from d2 - reportHandler.onMessage( - new ContainerReportFromDatanode(dn2, - createContainerReport(new long[] {c1})), this); + addContainerToContainerManager( + containerManager, containerOne, containerOneReplicas); + addContainerToContainerManager( + containerManager, containerTwo, containerTwoReplicas); - Assert.assertEquals(1, publishedEvents.size()); - ReplicationRequest replicationRequest = - (ReplicationRequest) publishedEvents.get(0); + Mockito.doAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + if (args[0].equals(containerOne.containerID())) { + ContainerReplica replica = (ContainerReplica) args[1]; + containerOneReplicas.remove(replica); + } + return null; + }).when(containerManager).removeContainerReplica( + Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class)); - Assert.assertEquals(c2, replicationRequest.getContainerId()); - Assert.assertEquals(3, replicationRequest.getExpecReplicationCount()); - Assert.assertEquals(2, replicationRequest.getReplicationCount()); - //container was replicated to dn4 - reportHandler.onMessage( - new ContainerReportFromDatanode(dn4, - createContainerReport(new long[] {c2})), this); + Mockito.when( + containerManager.getContainerReplicas(containerOne.containerID())) + .thenReturn(containerOneReplicas); + Mockito.when( + containerManager.getContainerReplicas(containerTwo.containerID())) + .thenReturn(containerTwoReplicas); - //no more event, everything is perfect - Assert.assertEquals(1, publishedEvents.size()); + // SCM expects both containerOne and containerTwo to be in all the three + // datanodes datanodeOne, datanodeTwo and datanodeThree - //c2 was found at dn2 (it was missing before, magic) - reportHandler.onMessage( - new ContainerReportFromDatanode(dn2, - createContainerReport(new long[] {c1, c2})), this); + // Now datanodeOne is sending container report in which containerOne is + // missing. - //c2 is over replicated (dn1,dn2,dn3,dn4) - Assert.assertEquals(2, publishedEvents.size()); + // containerOne becomes under replicated. + final ContainerReportsProto containerReport = getContainerReportsProto( + containerTwo.containerID(), ContainerReplicaProto.State.CLOSED, + datanodeOne.getUuidString()); + final EventPublisher publisher = Mockito.mock(EventPublisher.class); + final ContainerReportFromDatanode containerReportFromDatanode = + new ContainerReportFromDatanode(datanodeOne, containerReport); + reportHandler.onMessage(containerReportFromDatanode, publisher); - replicationRequest = - (ReplicationRequest) publishedEvents.get(1); - - Assert.assertEquals(c2, replicationRequest.getContainerId()); - Assert.assertEquals(3, replicationRequest.getExpecReplicationCount()); - Assert.assertEquals(4, replicationRequest.getReplicationCount()); + // Now we should get a replication request for containerOne + Mockito.verify(publisher, Mockito.times(1)) + .fireEvent(Mockito.any(), Mockito.any()); + // TODO: verify whether are actually getting a replication request event + // for containerOne } - private ContainerReportsProto createContainerReport(long[] containerIds) { + @Test + public void testOverReplicatedContainer() throws NodeNotFoundException, + ContainerNotFoundException { - ContainerReportsProto.Builder crBuilder = + final NodeManager nodeManager = new MockNodeManager(true, 10); + final ContainerManager containerManager = Mockito.mock( + ContainerManager.class); + final ReplicationActivityStatus replicationActivityStatus = + new ReplicationActivityStatus(); + replicationActivityStatus.enableReplication(); + + final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); + final ContainerReportHandler reportHandler = new ContainerReportHandler( + nodeManager, pipelineManager, containerManager, + replicationActivityStatus); + final Iterator nodeIterator = nodeManager.getNodes( + NodeState.HEALTHY).iterator(); + final DatanodeDetails datanodeOne = nodeIterator.next(); + final DatanodeDetails datanodeTwo = nodeIterator.next(); + final DatanodeDetails datanodeThree = nodeIterator.next(); + final DatanodeDetails datanodeFour = nodeIterator.next(); + final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED); + final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED); + final Set containerIDSet = Stream.of( + containerOne.containerID(), containerTwo.containerID()) + .collect(Collectors.toSet()); + final Set containerOneReplicas = getReplicas( + containerOne.containerID(), ContainerReplicaProto.State.CLOSED, + datanodeOne, datanodeTwo, datanodeThree); + final Set containerTwoReplicas = getReplicas( + containerTwo.containerID(), ContainerReplicaProto.State.CLOSED, + datanodeOne, datanodeTwo, datanodeThree); + + nodeManager.setContainers(datanodeOne, containerIDSet); + nodeManager.setContainers(datanodeTwo, containerIDSet); + nodeManager.setContainers(datanodeThree, containerIDSet); + + addContainerToContainerManager( + containerManager, containerOne, containerOneReplicas); + addContainerToContainerManager( + containerManager, containerTwo, containerTwoReplicas); + + Mockito.doAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + if (args[0].equals(containerOne.containerID())) { + containerOneReplicas.add((ContainerReplica) args[1]); + } + return null; + }).when(containerManager).updateContainerReplica( + Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class)); + + + + // SCM expects both containerOne and containerTwo to be in all the three + // datanodes datanodeOne, datanodeTwo and datanodeThree + + // Now datanodeFour is sending container report which has containerOne. + + // containerOne becomes over replicated. + + final ContainerReportsProto containerReport = getContainerReportsProto( + containerOne.containerID(), ContainerReplicaProto.State.CLOSED, + datanodeOne.getUuidString()); + final EventPublisher publisher = Mockito.mock(EventPublisher.class); + final ContainerReportFromDatanode containerReportFromDatanode = + new ContainerReportFromDatanode(datanodeFour, containerReport); + reportHandler.onMessage(containerReportFromDatanode, publisher); + Mockito.verify(publisher, Mockito.times(1)) + .fireEvent(Mockito.any(), Mockito.any()); + + // TODO: verify whether are actually getting a replication request event + // for containerOne + } + + @Test + public void testOpenToClosing() + throws NodeNotFoundException, ContainerNotFoundException { + /* + * The container is in CLOSING state and all the replicas are either in + * OPEN or CLOSING state. + * + * The datanode reports that the replica is still in OPEN state. + * + * In this case SCM should trigger close container event to the datanode. + */ + + final NodeManager nodeManager = new MockNodeManager(true, 10); + final ContainerManager containerManager = Mockito.mock( + ContainerManager.class); + final ReplicationActivityStatus replicationActivityStatus = + new ReplicationActivityStatus(); + replicationActivityStatus.enableReplication(); + + final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); + final ContainerReportHandler reportHandler = new ContainerReportHandler( + nodeManager, pipelineManager, containerManager, + replicationActivityStatus); + final Iterator nodeIterator = nodeManager.getNodes( + NodeState.HEALTHY).iterator(); + final DatanodeDetails datanodeOne = nodeIterator.next(); + final DatanodeDetails datanodeTwo = nodeIterator.next(); + final DatanodeDetails datanodeThree = nodeIterator.next(); + final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING); + final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED); + final Set containerIDSet = Stream.of( + containerOne.containerID(), containerTwo.containerID()) + .collect(Collectors.toSet()); + final Set containerOneReplicas = getReplicas( + containerOne.containerID(), ContainerReplicaProto.State.OPEN, + datanodeOne); + + containerOneReplicas.addAll(getReplicas( + containerOne.containerID(), ContainerReplicaProto.State.CLOSING, + datanodeTwo, datanodeThree)); + + final Set containerTwoReplicas = getReplicas( + containerTwo.containerID(), ContainerReplicaProto.State.CLOSED, + datanodeOne, datanodeTwo, datanodeThree); + + nodeManager.setContainers(datanodeOne, containerIDSet); + nodeManager.setContainers(datanodeTwo, containerIDSet); + nodeManager.setContainers(datanodeThree, containerIDSet); + + addContainerToContainerManager( + containerManager, containerOne, containerOneReplicas); + addContainerToContainerManager( + containerManager, containerTwo, containerTwoReplicas); + mockUpdateContainerReplica( + containerManager, containerOne, containerOneReplicas); + + // Replica in datanodeOne of containerOne is in OPEN state. + final ContainerReportsProto containerReport = getContainerReportsProto( + containerOne.containerID(), ContainerReplicaProto.State.OPEN, + datanodeOne.getUuidString()); + final EventPublisher publisher = Mockito.mock(EventPublisher.class); + final ContainerReportFromDatanode containerReportFromDatanode = + new ContainerReportFromDatanode(datanodeOne, containerReport); + reportHandler.onMessage(containerReportFromDatanode, publisher); + + // Now we should get close container event for containerOne on datanodeOne + Mockito.verify(publisher, Mockito.times(1)) + .fireEvent(Mockito.any(), Mockito.any()); + + // TODO: verify whether are actually getting a close container + // datanode command for containerOne/datanodeOne + + /* + * The container is in CLOSING state and all the replicas are either in + * OPEN or CLOSING state. + * + * The datanode reports that the replica is in CLOSING state. + * + * In this case SCM should trigger close container event to the datanode. + */ + + // Replica in datanodeOne of containerOne is in OPEN state. + final ContainerReportsProto containerReportTwo = getContainerReportsProto( + containerOne.containerID(), ContainerReplicaProto.State.OPEN, + datanodeOne.getUuidString()); + final ContainerReportFromDatanode containerReportTwoFromDatanode = + new ContainerReportFromDatanode(datanodeOne, containerReportTwo); + reportHandler.onMessage(containerReportTwoFromDatanode, publisher); + + // Now we should get close container event for containerOne on datanodeOne + Mockito.verify(publisher, Mockito.times(2)) + .fireEvent(Mockito.any(), Mockito.any()); + + // TODO: verify whether are actually getting a close container + // datanode command for containerOne/datanodeOne + } + + @Test + public void testClosingToClosed() throws NodeNotFoundException, IOException { + /* + * The container is in CLOSING state and all the replicas are in + * OPEN/CLOSING state. + * + * The datanode reports that one of the replica is now CLOSED. + * + * In this case SCM should mark the container as CLOSED. + */ + final NodeManager nodeManager = new MockNodeManager(true, 10); + final ContainerManager containerManager = Mockito.mock( + ContainerManager.class); + final ReplicationActivityStatus replicationActivityStatus = + new ReplicationActivityStatus(); + replicationActivityStatus.enableReplication(); + + final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); + final ContainerReportHandler reportHandler = new ContainerReportHandler( + nodeManager, pipelineManager, containerManager, + replicationActivityStatus); + final Iterator nodeIterator = nodeManager.getNodes( + NodeState.HEALTHY).iterator(); + final DatanodeDetails datanodeOne = nodeIterator.next(); + final DatanodeDetails datanodeTwo = nodeIterator.next(); + final DatanodeDetails datanodeThree = nodeIterator.next(); + final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING); + final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED); + final Set containerIDSet = Stream.of( + containerOne.containerID(), containerTwo.containerID()) + .collect(Collectors.toSet()); + final Set containerOneReplicas = getReplicas( + containerOne.containerID(), + ContainerReplicaProto.State.CLOSING, + datanodeOne); + + containerOneReplicas.addAll(getReplicas( + containerOne.containerID(), + ContainerReplicaProto.State.OPEN, + datanodeTwo, datanodeThree)); + + final Set containerTwoReplicas = getReplicas( + containerTwo.containerID(), + ContainerReplicaProto.State.CLOSED, + datanodeOne, datanodeTwo, datanodeThree); + + nodeManager.setContainers(datanodeOne, containerIDSet); + nodeManager.setContainers(datanodeTwo, containerIDSet); + nodeManager.setContainers(datanodeThree, containerIDSet); + + addContainerToContainerManager( + containerManager, containerOne, containerOneReplicas); + addContainerToContainerManager( + containerManager, containerTwo, containerTwoReplicas); + mockUpdateContainerReplica( + containerManager, containerOne, containerOneReplicas); + mockUpdateContainerState(containerManager, containerOne, + LifeCycleEvent.CLOSE, LifeCycleState.CLOSED); + + final ContainerReportsProto containerReport = getContainerReportsProto( + containerOne.containerID(), ContainerReplicaProto.State.CLOSED, + datanodeOne.getUuidString()); + final EventPublisher publisher = Mockito.mock(EventPublisher.class); + final ContainerReportFromDatanode containerReportFromDatanode = + new ContainerReportFromDatanode(datanodeOne, containerReport); + reportHandler.onMessage(containerReportFromDatanode, publisher); + + Assert.assertEquals( + LifeCycleState.CLOSED, containerOne.getState()); + } + + @Test + public void testClosingToQuasiClosed() + throws NodeNotFoundException, IOException { + /* + * The container is in CLOSING state and all the replicas are in + * OPEN/CLOSING state. + * + * The datanode reports that the replica is now QUASI_CLOSED. + * + * In this case SCM should move the container to QUASI_CLOSED. + */ + final NodeManager nodeManager = new MockNodeManager(true, 10); + final ContainerManager containerManager = Mockito.mock( + ContainerManager.class); + final ReplicationActivityStatus replicationActivityStatus = + new ReplicationActivityStatus(); + replicationActivityStatus.enableReplication(); + + final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); + final ContainerReportHandler reportHandler = new ContainerReportHandler( + nodeManager, pipelineManager, containerManager, + replicationActivityStatus); + final Iterator nodeIterator = nodeManager.getNodes( + NodeState.HEALTHY).iterator(); + final DatanodeDetails datanodeOne = nodeIterator.next(); + final DatanodeDetails datanodeTwo = nodeIterator.next(); + final DatanodeDetails datanodeThree = nodeIterator.next(); + final ContainerInfo containerOne = + getContainer(LifeCycleState.CLOSING); + final ContainerInfo containerTwo = + getContainer(LifeCycleState.CLOSED); + final Set containerIDSet = Stream.of( + containerOne.containerID(), containerTwo.containerID()) + .collect(Collectors.toSet()); + final Set containerOneReplicas = getReplicas( + containerOne.containerID(), + ContainerReplicaProto.State.CLOSING, + datanodeOne, datanodeTwo); + containerOneReplicas.addAll(getReplicas( + containerOne.containerID(), + ContainerReplicaProto.State.OPEN, + datanodeThree)); + final Set containerTwoReplicas = getReplicas( + containerTwo.containerID(), + ContainerReplicaProto.State.CLOSED, + datanodeOne, datanodeTwo, datanodeThree); + + nodeManager.setContainers(datanodeOne, containerIDSet); + nodeManager.setContainers(datanodeTwo, containerIDSet); + nodeManager.setContainers(datanodeThree, containerIDSet); + + addContainerToContainerManager( + containerManager, containerOne, containerOneReplicas); + addContainerToContainerManager( + containerManager, containerTwo, containerTwoReplicas); + mockUpdateContainerReplica( + containerManager, containerOne, containerOneReplicas); + mockUpdateContainerState(containerManager, containerOne, + LifeCycleEvent.QUASI_CLOSE, LifeCycleState.QUASI_CLOSED); + + final ContainerReportsProto containerReport = getContainerReportsProto( + containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED, + datanodeOne.getUuidString()); + final EventPublisher publisher = Mockito.mock(EventPublisher.class); + final ContainerReportFromDatanode containerReportFromDatanode = + new ContainerReportFromDatanode(datanodeOne, containerReport); + reportHandler.onMessage(containerReportFromDatanode, publisher); + + Assert.assertEquals( + LifeCycleState.QUASI_CLOSED, containerOne.getState()); + } + + @Test + public void testQuasiClosedWithDifferentOriginNodeReplica() + throws NodeNotFoundException, IOException { + /* + * The container is in QUASI_CLOSED state. + * - One of the replica is in QUASI_CLOSED state + * - The other two replica are in OPEN/CLOSING state + * + * The datanode reports the second replica is now QUASI_CLOSED. + * + * In this case SCM should CLOSE the container with highest BCSID and + * send force close command to the datanode. + */ + final NodeManager nodeManager = new MockNodeManager(true, 10); + final ContainerManager containerManager = Mockito.mock( + ContainerManager.class); + final ReplicationActivityStatus replicationActivityStatus = + new ReplicationActivityStatus(); + replicationActivityStatus.enableReplication(); + + final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); + final ContainerReportHandler reportHandler = new ContainerReportHandler( + nodeManager, pipelineManager, containerManager, + replicationActivityStatus); + final Iterator nodeIterator = nodeManager.getNodes( + NodeState.HEALTHY).iterator(); + final DatanodeDetails datanodeOne = nodeIterator.next(); + final DatanodeDetails datanodeTwo = nodeIterator.next(); + final DatanodeDetails datanodeThree = nodeIterator.next(); + final ContainerInfo containerOne = + getContainer(LifeCycleState.QUASI_CLOSED); + final ContainerInfo containerTwo = + getContainer(LifeCycleState.CLOSED); + final Set containerIDSet = Stream.of( + containerOne.containerID(), containerTwo.containerID()) + .collect(Collectors.toSet()); + final Set containerOneReplicas = getReplicas( + containerOne.containerID(), + ContainerReplicaProto.State.QUASI_CLOSED, + 10000L, + datanodeOne); + containerOneReplicas.addAll(getReplicas( + containerOne.containerID(), + ContainerReplicaProto.State.CLOSING, + datanodeTwo, datanodeThree)); + final Set containerTwoReplicas = getReplicas( + containerTwo.containerID(), + ContainerReplicaProto.State.CLOSED, + datanodeOne, datanodeTwo, datanodeThree); + + nodeManager.setContainers(datanodeOne, containerIDSet); + nodeManager.setContainers(datanodeTwo, containerIDSet); + nodeManager.setContainers(datanodeThree, containerIDSet); + + addContainerToContainerManager( + containerManager, containerOne, containerOneReplicas); + addContainerToContainerManager( + containerManager, containerTwo, containerTwoReplicas); + mockUpdateContainerReplica( + containerManager, containerOne, containerOneReplicas); + mockUpdateContainerState(containerManager, containerOne, + LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED); + + // Container replica with datanodeOne as originNodeId is already + // QUASI_CLOSED. Now we will tell SCM that container replica from + // datanodeTwo is also QUASI_CLOSED, but has higher sequenceId. + final ContainerReportsProto containerReport = getContainerReportsProto( + containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED, + datanodeTwo.getUuidString(), 999999L); + + final EventPublisher publisher = Mockito.mock(EventPublisher.class); + final ContainerReportFromDatanode containerReportFromDatanode = + new ContainerReportFromDatanode(datanodeTwo, containerReport); + reportHandler.onMessage(containerReportFromDatanode, publisher); + + // Now we should get force close container event for containerOne on + // datanodeTwo + Mockito.verify(publisher, Mockito.times(1)) + .fireEvent(Mockito.any(), Mockito.any()); + // TODO: verify whether are actually getting a force close container + // datanode command for containerOne/datanodeTwo + + // The sequence id of the container should have been updated. + Assert.assertEquals(999999L, containerOne.getSequenceId()); + + // Now datanodeTwo should close containerOne. + final ContainerReportsProto containerReportTwo = getContainerReportsProto( + containerOne.containerID(), ContainerReplicaProto.State.CLOSED, + datanodeTwo.getUuidString(), 999999L); + final ContainerReportFromDatanode containerReportFromDatanodeTwo = + new ContainerReportFromDatanode(datanodeTwo, containerReportTwo); + reportHandler.onMessage(containerReportFromDatanodeTwo, publisher); + + // The container should be closed in SCM now. + Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState()); + } + + @Test + public void testQuasiClosedWithSameOriginNodeReplica() + throws NodeNotFoundException, IOException { + /* + * The container is in QUASI_CLOSED state. + * - One of the replica is in QUASI_CLOSED state + * - The other two replica are in OPEN/CLOSING state + * + * The datanode reports a QUASI_CLOSED replica which has the same + * origin node id as the existing QUASI_CLOSED replica. + * + * In this case SCM should not CLOSE the container. + */ + final NodeManager nodeManager = new MockNodeManager(true, 10); + final ContainerManager containerManager = Mockito.mock( + ContainerManager.class); + final ReplicationActivityStatus replicationActivityStatus = + new ReplicationActivityStatus(); + replicationActivityStatus.enableReplication(); + + final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); + final ContainerReportHandler reportHandler = new ContainerReportHandler( + nodeManager, pipelineManager, containerManager, + replicationActivityStatus); + final Iterator nodeIterator = nodeManager.getNodes( + NodeState.HEALTHY).iterator(); + final DatanodeDetails datanodeOne = nodeIterator.next(); + final DatanodeDetails datanodeTwo = nodeIterator.next(); + final DatanodeDetails datanodeThree = nodeIterator.next(); + final ContainerInfo containerOne = + getContainer(LifeCycleState.QUASI_CLOSED); + final ContainerInfo containerTwo = + getContainer(LifeCycleState.CLOSED); + final Set containerIDSet = Stream.of( + containerOne.containerID(), containerTwo.containerID()) + .collect(Collectors.toSet()); + final Set containerOneReplicas = getReplicas( + containerOne.containerID(), + ContainerReplicaProto.State.QUASI_CLOSED, + datanodeOne); + containerOneReplicas.addAll(getReplicas( + containerOne.containerID(), + ContainerReplicaProto.State.CLOSING, + datanodeTwo)); + final Set containerTwoReplicas = getReplicas( + containerTwo.containerID(), + ContainerReplicaProto.State.CLOSED, + datanodeOne, datanodeTwo, datanodeThree); + + nodeManager.setContainers(datanodeOne, containerIDSet); + nodeManager.setContainers(datanodeTwo, containerIDSet); + nodeManager.setContainers(datanodeThree, + Collections.singleton(containerTwo.containerID())); + + addContainerToContainerManager( + containerManager, containerOne, containerOneReplicas); + addContainerToContainerManager( + containerManager, containerTwo, containerTwoReplicas); + + mockUpdateContainerReplica( + containerManager, containerOne, containerOneReplicas); + mockUpdateContainerState(containerManager, containerOne, + LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED); + + // containerOne is QUASI_CLOSED in datanodeOne and CLOSING in datanodeTwo. + // Now datanodeThree is sending container report which says that it has + // containerOne replica, but the originNodeId of this replica is + // datanodeOne. In this case we should not force close the container even + // though we got two QUASI_CLOSED replicas. + final ContainerReportsProto containerReport = getContainerReportsProto( + containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED, + datanodeOne.getUuidString()); + final EventPublisher publisher = Mockito.mock(EventPublisher.class); + final ContainerReportFromDatanode containerReportFromDatanode = + new ContainerReportFromDatanode(datanodeThree, containerReport); + reportHandler.onMessage(containerReportFromDatanode, publisher); + + Mockito.verify(publisher, Mockito.times(0)) + .fireEvent(Mockito.any(), Mockito.any()); + } + + private static ContainerReportsProto getContainerReportsProto( + final ContainerID containerId, final ContainerReplicaProto.State state, + final String originNodeId) { + return getContainerReportsProto(containerId, state, originNodeId, 100L); + } + + private static ContainerReportsProto getContainerReportsProto( + final ContainerID containerId, final ContainerReplicaProto.State state, + final String originNodeId, final long bcsid) { + final ContainerReportsProto.Builder crBuilder = ContainerReportsProto.newBuilder(); - - for (long containerId : containerIds) { - org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos - .ContainerReplicaProto.Builder - ciBuilder = org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos - .ContainerReplicaProto.newBuilder(); - ciBuilder.setFinalhash("e16cc9d6024365750ed8dbd194ea46d2") - .setSize(5368709120L) - .setUsed(2000000000L) - .setKeyCount(100000000L) - .setReadCount(100000000L) - .setWriteCount(100000000L) - .setReadBytes(2000000000L) - .setWriteBytes(2000000000L) - .setContainerID(containerId) - .setDeleteTransactionId(0); - - crBuilder.addReports(ciBuilder.build()); - } - - return crBuilder.build(); + final ContainerReplicaProto replicaProto = + ContainerReplicaProto.newBuilder() + .setContainerID(containerId.getId()) + .setState(state) + .setOriginNodeId(originNodeId) + .setFinalhash("e16cc9d6024365750ed8dbd194ea46d2") + .setSize(5368709120L) + .setUsed(2000000000L) + .setKeyCount(100000000L) + .setReadCount(100000000L) + .setWriteCount(100000000L) + .setReadBytes(2000000000L) + .setWriteBytes(2000000000L) + .setBlockCommitSequenceId(bcsid) + .setDeleteTransactionId(0) + .build(); + return crBuilder.addReports(replicaProto).build(); } - @Override - public > void fireEvent( - EVENT_TYPE event, PAYLOAD payload) { - LOG.info("Event is published: {}", payload); - publishedEvents.add(payload); - } } \ No newline at end of file diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java new file mode 100644 index 0000000000..0fb50a426a --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHelper.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import org.apache.commons.lang3.RandomUtils; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +/** + * Helper methods for testing ContainerReportHandler and + * IncrementalContainerReportHandler. + */ +public final class TestContainerReportHelper { + + private TestContainerReportHelper() {} + + static void addContainerToContainerManager( + final ContainerManager containerManager, final ContainerInfo container, + final Set replicas) throws ContainerNotFoundException { + Mockito.when(containerManager.getContainer(container.containerID())) + .thenReturn(container); + Mockito.when( + containerManager.getContainerReplicas(container.containerID())) + .thenReturn(replicas); + } + + static void mockUpdateContainerReplica( + final ContainerManager containerManager, + final ContainerInfo containerInfo, final Set replicas) + throws ContainerNotFoundException { + Mockito.doAnswer((Answer) invocation -> { + Object[] args = invocation.getArguments(); + if (args[0].equals(containerInfo.containerID())) { + ContainerReplica replica = (ContainerReplica) args[1]; + replicas.remove(replica); + replicas.add(replica); + } + return null; + }).when(containerManager).updateContainerReplica( + Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class)); + } + + static void mockUpdateContainerState( + final ContainerManager containerManager, + final ContainerInfo containerInfo, + final LifeCycleEvent event, final LifeCycleState state) + throws IOException { + Mockito.doAnswer((Answer) invocation -> { + containerInfo.setState(state); + return containerInfo.getState(); + }).when(containerManager).updateContainerState( + containerInfo.containerID(), event); + } + + public static ContainerInfo getContainer(final LifeCycleState state) { + return new ContainerInfo.Builder() + .setContainerID(RandomUtils.nextLong()) + .setReplicationType(ReplicationType.RATIS) + .setReplicationFactor(ReplicationFactor.THREE) + .setState(state) + .build(); + } + + static Set getReplicas( + final ContainerID containerId, + final ContainerReplicaProto.State state, + final DatanodeDetails... datanodeDetails) { + return getReplicas(containerId, state, 10000L, datanodeDetails); + } + + static Set getReplicas( + final ContainerID containerId, + final ContainerReplicaProto.State state, + final long sequenceId, + final DatanodeDetails... datanodeDetails) { + Set replicas = new HashSet<>(); + for (DatanodeDetails datanode : datanodeDetails) { + replicas.add(ContainerReplica.newBuilder() + .setContainerID(containerId) + .setContainerState(state) + .setDatanodeDetails(datanode) + .setOriginNodeId(datanode.getUuid()) + .setSequenceId(sequenceId) + .build()); + } + return replicas; + } +} diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java new file mode 100644 index 0000000000..23e96dd542 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestIncrementalContainerReportHandler.java @@ -0,0 +1,218 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager; +import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher + .IncrementalContainerReportFromDatanode; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Set; + +import static org.apache.hadoop.hdds.scm.container + .TestContainerReportHelper.addContainerToContainerManager; +import static org.apache.hadoop.hdds.scm.container + .TestContainerReportHelper.getContainer; +import static org.apache.hadoop.hdds.scm.container + .TestContainerReportHelper.getReplicas; +import static org.apache.hadoop.hdds.scm.container + .TestContainerReportHelper.mockUpdateContainerReplica; +import static org.apache.hadoop.hdds.scm.container + .TestContainerReportHelper.mockUpdateContainerState; + +/** + * Test cases to verify the functionality of IncrementalContainerReportHandler. + */ +public class TestIncrementalContainerReportHandler { + + @Test + public void testClosingToClosed() throws IOException { + final ContainerManager containerManager = Mockito.mock( + ContainerManager.class); + final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); + final IncrementalContainerReportHandler reportHandler = + new IncrementalContainerReportHandler( + pipelineManager, containerManager); + final ContainerInfo container = getContainer(LifeCycleState.CLOSING); + final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails(); + final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails(); + final DatanodeDetails datanodeThree = TestUtils.randomDatanodeDetails(); + final Set containerReplicas = getReplicas( + container.containerID(), + ContainerReplicaProto.State.CLOSING, + datanodeOne, datanodeTwo, datanodeThree); + + addContainerToContainerManager( + containerManager, container, containerReplicas); + mockUpdateContainerReplica( + containerManager, container, containerReplicas); + mockUpdateContainerState(containerManager, container, + LifeCycleEvent.CLOSE, LifeCycleState.CLOSED); + + final IncrementalContainerReportProto containerReport = + getIncrementalContainerReportProto(container.containerID(), + ContainerReplicaProto.State.CLOSED, + datanodeOne.getUuidString()); + final EventPublisher publisher = Mockito.mock(EventPublisher.class); + final IncrementalContainerReportFromDatanode icrFromDatanode = + new IncrementalContainerReportFromDatanode( + datanodeOne, containerReport); + reportHandler.onMessage(icrFromDatanode, publisher); + Assert.assertEquals( + LifeCycleState.CLOSED, container.getState()); + } + + @Test + public void testClosingToQuasiClosed() throws IOException { + final ContainerManager containerManager = Mockito.mock( + ContainerManager.class); + final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); + final IncrementalContainerReportHandler reportHandler = + new IncrementalContainerReportHandler( + pipelineManager, containerManager); + final ContainerInfo container = getContainer(LifeCycleState.CLOSING); + final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails(); + final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails(); + final DatanodeDetails datanodeThree = TestUtils.randomDatanodeDetails(); + final Set containerReplicas = getReplicas( + container.containerID(), + ContainerReplicaProto.State.CLOSING, + datanodeOne, datanodeTwo, datanodeThree); + + addContainerToContainerManager( + containerManager, container, containerReplicas); + mockUpdateContainerReplica( + containerManager, container, containerReplicas); + mockUpdateContainerState(containerManager, container, + LifeCycleEvent.QUASI_CLOSE, LifeCycleState.QUASI_CLOSED); + + final IncrementalContainerReportProto containerReport = + getIncrementalContainerReportProto(container.containerID(), + ContainerReplicaProto.State.QUASI_CLOSED, + datanodeOne.getUuidString()); + final EventPublisher publisher = Mockito.mock(EventPublisher.class); + final IncrementalContainerReportFromDatanode icrFromDatanode = + new IncrementalContainerReportFromDatanode( + datanodeOne, containerReport); + reportHandler.onMessage(icrFromDatanode, publisher); + Assert.assertEquals( + LifeCycleState.QUASI_CLOSED, container.getState()); + } + + @Test + public void testQuasiClosedToClosed() throws IOException { + final ContainerManager containerManager = Mockito.mock( + ContainerManager.class); + final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class); + final IncrementalContainerReportHandler reportHandler = + new IncrementalContainerReportHandler( + pipelineManager, containerManager); + final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED); + final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails(); + final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails(); + final DatanodeDetails datanodeThree = TestUtils.randomDatanodeDetails(); + final Set containerReplicas = getReplicas( + container.containerID(), + ContainerReplicaProto.State.CLOSING, + datanodeOne, datanodeTwo); + containerReplicas.addAll(getReplicas( + container.containerID(), + ContainerReplicaProto.State.QUASI_CLOSED, + datanodeThree)); + + + addContainerToContainerManager( + containerManager, container, containerReplicas); + mockUpdateContainerReplica( + containerManager, container, containerReplicas); + mockUpdateContainerState(containerManager, container, + LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED); + + final IncrementalContainerReportProto containerReport = + getIncrementalContainerReportProto(container.containerID(), + ContainerReplicaProto.State.QUASI_CLOSED, + datanodeOne.getUuidString(), 999999L); + final EventPublisher publisher = Mockito.mock(EventPublisher.class); + final IncrementalContainerReportFromDatanode icrFromDatanode = + new IncrementalContainerReportFromDatanode( + datanodeOne, containerReport); + reportHandler.onMessage(icrFromDatanode, publisher); + + // SCM should issue force close. + Mockito.verify(publisher, Mockito.times(1)) + .fireEvent(Mockito.any(), Mockito.any()); + + final IncrementalContainerReportProto containerReportTwo = + getIncrementalContainerReportProto(container.containerID(), + ContainerReplicaProto.State.CLOSED, + datanodeOne.getUuidString(), 999999L); + final IncrementalContainerReportFromDatanode icrTwoFromDatanode = + new IncrementalContainerReportFromDatanode( + datanodeOne, containerReportTwo); + reportHandler.onMessage(icrTwoFromDatanode, publisher); + Assert.assertEquals( + LifeCycleState.CLOSED, container.getState()); + } + + private static IncrementalContainerReportProto + getIncrementalContainerReportProto( + final ContainerID containerId, + final ContainerReplicaProto.State state, + final String originNodeId) { + return getIncrementalContainerReportProto( + containerId, state, originNodeId, 100L); + } + + private static IncrementalContainerReportProto + getIncrementalContainerReportProto( + final ContainerID containerId, + final ContainerReplicaProto.State state, + final String originNodeId, final long bcsid) { + final IncrementalContainerReportProto.Builder crBuilder = + IncrementalContainerReportProto.newBuilder(); + final ContainerReplicaProto replicaProto = + ContainerReplicaProto.newBuilder() + .setContainerID(containerId.getId()) + .setState(state) + .setOriginNodeId(originNodeId) + .setFinalhash("e16cc9d6024365750ed8dbd194ea46d2") + .setSize(5368709120L) + .setUsed(2000000000L) + .setKeyCount(100000000L) + .setReadCount(100000000L) + .setWriteCount(100000000L) + .setReadBytes(2000000000L) + .setWriteBytes(2000000000L) + .setBlockCommitSequenceId(bcsid) + .setDeleteTransactionId(0) + .build(); + return crBuilder.addReport(replicaProto).build(); + } +} \ No newline at end of file