HDDS-1207. Refactor Container Report Processing logic and plugin new Replication Manager. (#662)

This commit is contained in:
Nanda kumar 2019-04-04 16:32:59 +05:30 committed by GitHub
parent 945e8c6064
commit 48a58bce37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 726 additions and 1241 deletions

View File

@ -108,13 +108,6 @@ public class ContainerInfo implements Comparator<ContainerInfo>,
this.replicationType = repType;
}
public ContainerInfo(ContainerInfo info) {
this(info.getContainerID(), info.getState(), info.getPipelineID(),
info.getUsedBytes(), info.getNumberOfKeys(),
info.getStateEnterTime(), info.getOwner(),
info.getDeleteTransactionId(), info.getSequenceId(),
info.getReplicationFactor(), info.getReplicationType());
}
/**
* Needed for serialization findbugs.
*/

View File

@ -20,8 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.container.replication.
ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager.ChillModeStatus;
import org.apache.hadoop.hdds.server.events.EventHandler;
@ -41,7 +40,7 @@ public class ChillModeHandler implements EventHandler<ChillModeStatus> {
private final BlockManager scmBlockManager;
private final long waitTime;
private final AtomicBoolean isInChillMode = new AtomicBoolean(true);
private final ReplicationActivityStatus replicationActivityStatus;
private final ReplicationManager replicationManager;
/**
@ -49,27 +48,27 @@ public class ChillModeHandler implements EventHandler<ChillModeStatus> {
* @param configuration
* @param clientProtocolServer
* @param blockManager
* @param replicationStatus
* @param replicationManager
*/
public ChillModeHandler(Configuration configuration,
SCMClientProtocolServer clientProtocolServer,
BlockManager blockManager,
ReplicationActivityStatus replicationStatus) {
ReplicationManager replicationManager) {
Objects.requireNonNull(configuration, "Configuration cannot be null");
Objects.requireNonNull(clientProtocolServer, "SCMClientProtocolServer " +
"object cannot be null");
Objects.requireNonNull(blockManager, "BlockManager object cannot be null");
Objects.requireNonNull(replicationStatus, "ReplicationActivityStatus " +
Objects.requireNonNull(replicationManager, "ReplicationManager " +
"object cannot be null");
this.waitTime = configuration.getTimeDuration(
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT,
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT_DEFAULT,
TimeUnit.MILLISECONDS);
scmClientProtocolServer = clientProtocolServer;
scmBlockManager = blockManager;
replicationActivityStatus = replicationStatus;
this.scmClientProtocolServer = clientProtocolServer;
this.scmBlockManager = blockManager;
this.replicationManager = replicationManager;
boolean chillModeEnabled = configuration.getBoolean(
final boolean chillModeEnabled = configuration.getBoolean(
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED,
HddsConfigKeys.HDDS_SCM_CHILLMODE_ENABLED_DEFAULT);
isInChillMode.set(chillModeEnabled);
@ -89,13 +88,16 @@ public ChillModeHandler(Configuration configuration,
@Override
public void onMessage(ChillModeStatus chillModeStatus,
EventPublisher publisher) {
isInChillMode.set(chillModeStatus.getChillModeStatus());
replicationActivityStatus.fireReplicationStart(isInChillMode.get(),
waitTime);
scmClientProtocolServer.setChillModeStatus(isInChillMode.get());
scmBlockManager.setChillModeStatus(isInChillMode.get());
try {
isInChillMode.set(chillModeStatus.getChillModeStatus());
scmClientProtocolServer.setChillModeStatus(isInChillMode.get());
scmBlockManager.setChillModeStatus(isInChillMode.get());
Thread.sleep(waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
replicationManager.start();
}
}
public boolean getChillModeStatus() {

View File

@ -0,0 +1,236 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.UUID;
import java.util.function.Supplier;
/**
* Base class for all the container report handlers.
*/
public class AbstractContainerReportHandler {
private final ContainerManager containerManager;
private final Logger logger;
/**
* Constructs AbstractContainerReportHandler instance with the
* given ContainerManager instance.
*
* @param containerManager ContainerManager
* @param logger Logger to be used for logging
*/
AbstractContainerReportHandler(final ContainerManager containerManager,
final Logger logger) {
Preconditions.checkNotNull(containerManager);
Preconditions.checkNotNull(logger);
this.containerManager = containerManager;
this.logger = logger;
}
/**
* Process the given ContainerReplica received from specified datanode.
*
* @param datanodeDetails DatanodeDetails of the node which reported
* this replica
* @param replicaProto ContainerReplica
*
* @throws IOException In case of any Exception while processing the report
*/
void processContainerReplica(final DatanodeDetails datanodeDetails,
final ContainerReplicaProto replicaProto)
throws IOException {
final ContainerID containerId = ContainerID
.valueof(replicaProto.getContainerID());
final ContainerReplica replica = ContainerReplica.newBuilder()
.setContainerID(containerId)
.setContainerState(replicaProto.getState())
.setDatanodeDetails(datanodeDetails)
.setOriginNodeId(UUID.fromString(replicaProto.getOriginNodeId()))
.setSequenceId(replicaProto.getBlockCommitSequenceId())
.build();
logger.debug("Processing replica of container {} from datanode {}",
containerId, datanodeDetails);
// Synchronized block should be replaced by container lock,
// once we have introduced lock inside ContainerInfo.
synchronized (containerManager.getContainer(containerId)) {
updateContainerStats(containerId, replicaProto);
updateContainerState(datanodeDetails, containerId, replica);
containerManager.updateContainerReplica(containerId, replica);
}
}
/**
* Update the container stats if it's lagging behind the stats in reported
* replica.
*
* @param containerId ID of the container
* @param replicaProto Container Replica information
* @throws ContainerNotFoundException If the container is not present
*/
private void updateContainerStats(final ContainerID containerId,
final ContainerReplicaProto replicaProto)
throws ContainerNotFoundException {
if (!isUnhealthy(replicaProto::getState)) {
final ContainerInfo containerInfo = containerManager
.getContainer(containerId);
if (containerInfo.getSequenceId() <
replicaProto.getBlockCommitSequenceId()) {
containerInfo.updateSequenceId(
replicaProto.getBlockCommitSequenceId());
}
if (containerInfo.getUsedBytes() < replicaProto.getUsed()) {
containerInfo.setUsedBytes(replicaProto.getUsed());
}
if (containerInfo.getNumberOfKeys() < replicaProto.getKeyCount()) {
containerInfo.setNumberOfKeys(replicaProto.getKeyCount());
}
}
}
/**
* Updates the container state based on the given replica state.
*
* @param datanode Datanode from which the report is received
* @param containerId ID of the container
* @param replica ContainerReplica
* @throws IOException In case of Exception
*/
private void updateContainerState(final DatanodeDetails datanode,
final ContainerID containerId,
final ContainerReplica replica)
throws IOException {
final ContainerInfo container = containerManager
.getContainer(containerId);
switch (container.getState()) {
case OPEN:
/*
* If the state of a container is OPEN, datanodes cannot report
* any other state.
*/
if (replica.getState() != State.OPEN) {
logger.warn("Container {} is in OPEN state, but the datanode {} " +
"reports an {} replica.", containerId,
datanode, replica.getState());
// Should we take some action?
}
break;
case CLOSING:
/*
* When the container is in CLOSING state the replicas can be in any
* of the following states:
*
* - OPEN
* - CLOSING
* - QUASI_CLOSED
* - CLOSED
*
* If all the replica are either in OPEN or CLOSING state, do nothing.
*
* If the replica is in QUASI_CLOSED state, move the container to
* QUASI_CLOSED state.
*
* If the replica is in CLOSED state, mark the container as CLOSED.
*
*/
if (replica.getState() == State.QUASI_CLOSED) {
logger.info("Moving container {} to QUASI_CLOSED state, datanode {} " +
"reported QUASI_CLOSED replica.", containerId, datanode);
containerManager.updateContainerState(containerId,
LifeCycleEvent.QUASI_CLOSE);
}
if (replica.getState() == State.CLOSED) {
logger.info("Moving container {} to CLOSED state, datanode {} " +
"reported CLOSED replica.", containerId, datanode);
Preconditions.checkArgument(replica.getSequenceId()
== container.getSequenceId());
containerManager.updateContainerState(containerId,
LifeCycleEvent.CLOSE);
}
break;
case QUASI_CLOSED:
/*
* The container is in QUASI_CLOSED state, this means that at least
* one of the replica was QUASI_CLOSED.
*
* Now replicas can be in any of the following state.
*
* 1. OPEN
* 2. CLOSING
* 3. QUASI_CLOSED
* 4. CLOSED
*
* If at least one of the replica is in CLOSED state, mark the
* container as CLOSED.
*
*/
if (replica.getState() == State.CLOSED) {
logger.info("Moving container {} to CLOSED state, datanode {} " +
"reported CLOSED replica.", containerId, datanode);
Preconditions.checkArgument(replica.getSequenceId()
== container.getSequenceId());
containerManager.updateContainerState(containerId,
LifeCycleEvent.FORCE_CLOSE);
}
break;
case CLOSED:
/*
* The container is already in closed state. do nothing.
*/
break;
case DELETING:
throw new UnsupportedOperationException(
"Unsupported container state 'DELETING'.");
case DELETED:
throw new UnsupportedOperationException(
"Unsupported container state 'DELETED'.");
default:
break;
}
}
/**
* Returns true if the container replica is not marked as UNHEALTHY.
*
* @param replicaState State of the container replica.
* @return true if unhealthy, false otherwise
*/
private boolean isUnhealthy(final Supplier<State> replicaState) {
return replicaState.get() == ContainerReplicaProto.State.UNHEALTHY;
}
}

View File

@ -15,13 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
@ -29,115 +24,85 @@
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
import org.apache.hadoop.hdds.scm.container.replication
.ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server
.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Handles container reports from datanode.
*/
public class ContainerReportHandler implements
EventHandler<ContainerReportFromDatanode> {
public class ContainerReportHandler extends AbstractContainerReportHandler
implements EventHandler<ContainerReportFromDatanode> {
private static final Logger LOG =
LoggerFactory.getLogger(ContainerReportHandler.class);
private final NodeManager nodeManager;
private final PipelineManager pipelineManager;
private final ContainerManager containerManager;
private final ReplicationActivityStatus replicationStatus;
/**
* Constructs ContainerReportHandler instance with the
* given NodeManager and ContainerManager instance.
*
* @param nodeManager NodeManager instance
* @param containerManager ContainerManager instance
*/
public ContainerReportHandler(final NodeManager nodeManager,
final PipelineManager pipelineManager,
final ContainerManager containerManager,
final ReplicationActivityStatus replicationActivityStatus) {
Preconditions.checkNotNull(nodeManager);
Preconditions.checkNotNull(pipelineManager);
Preconditions.checkNotNull(containerManager);
Preconditions.checkNotNull(replicationActivityStatus);
final ContainerManager containerManager) {
super(containerManager, LOG);
this.nodeManager = nodeManager;
this.pipelineManager = pipelineManager;
this.containerManager = containerManager;
this.replicationStatus = replicationActivityStatus;
}
/**
* Process the container reports from datanodes.
*
* @param reportFromDatanode Container Report
* @param publisher EventPublisher reference
*/
@Override
public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
final EventPublisher publisher) {
final EventPublisher publisher) {
final DatanodeDetails datanodeDetails =
reportFromDatanode.getDatanodeDetails();
final ContainerReportsProto containerReport =
reportFromDatanode.getReport();
try {
final List<ContainerReplicaProto> replicas =
containerReport.getReportsList();
final Set<ContainerID> containersInSCM =
nodeManager.getContainers(datanodeDetails);
final List<ContainerReplicaProto> replicas = containerReport
.getReportsList();
// ContainerIDs which SCM expects this datanode to have.
final Set<ContainerID> expectedContainerIDs = nodeManager
.getContainers(datanodeDetails);
// ContainerIDs that this datanode actually has.
final Set<ContainerID> actualContainerIDs = replicas.parallelStream()
final Set<ContainerID> containersInDn = replicas.parallelStream()
.map(ContainerReplicaProto::getContainerID)
.map(ContainerID::valueof).collect(Collectors.toSet());
// Container replicas which SCM is not aware of.
final Set<ContainerID> newReplicas =
new HashSet<>(actualContainerIDs);
newReplicas.removeAll(expectedContainerIDs);
final Set<ContainerID> missingReplicas = new HashSet<>(containersInSCM);
missingReplicas.removeAll(containersInDn);
// Container replicas which are missing from datanode.
final Set<ContainerID> missingReplicas =
new HashSet<>(expectedContainerIDs);
missingReplicas.removeAll(actualContainerIDs);
processContainerReplicas(datanodeDetails, replicas);
processMissingReplicas(datanodeDetails, missingReplicas);
updateDeleteTransaction(datanodeDetails, replicas, publisher);
processContainerReplicas(datanodeDetails, replicas, publisher);
// Remove missing replica from ContainerManager
for (ContainerID id : missingReplicas) {
try {
containerManager.getContainerReplicas(id)
.stream()
.filter(replica ->
replica.getDatanodeDetails().equals(datanodeDetails))
.findFirst()
.ifPresent(replica -> {
try {
containerManager.removeContainerReplica(id, replica);
} catch (ContainerNotFoundException |
ContainerReplicaNotFoundException e) {
// This should not happen, but even if it happens, not an
// issue
}
});
} catch (ContainerNotFoundException e) {
LOG.warn("Cannot remove container replica, container {} not found {}",
id, e);
}
}
// Update the latest set of containers for this datanode in NodeManager.
nodeManager.setContainers(datanodeDetails, actualContainerIDs);
// Replicate if needed.
newReplicas.forEach(id -> checkReplicationState(id, publisher));
missingReplicas.forEach(id -> checkReplicationState(id, publisher));
/*
* Update the latest set of containers for this datanode in
* NodeManager
*/
nodeManager.setContainers(datanodeDetails, containersInDn);
} catch (NodeNotFoundException ex) {
LOG.error("Received container report from unknown datanode {} {}",
@ -146,37 +111,84 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
}
/**
* Processes the ContainerReport.
*
* @param datanodeDetails Datanode from which this report was received
* @param replicas list of ContainerReplicaProto
*/
private void processContainerReplicas(final DatanodeDetails datanodeDetails,
final List<ContainerReplicaProto> replicas) {
for (ContainerReplicaProto replicaProto : replicas) {
try {
processContainerReplica(datanodeDetails, replicaProto);
} catch (ContainerNotFoundException e) {
LOG.error("Received container report for an unknown container" +
" {} from datanode {}.", replicaProto.getContainerID(),
datanodeDetails, e);
} catch (IOException e) {
LOG.error("Exception while processing container report for container" +
" {} from datanode {}.", replicaProto.getContainerID(),
datanodeDetails, e);
}
}
}
/**
* Process the missing replica on the given datanode.
*
* @param datanodeDetails DatanodeDetails
* @param missingReplicas ContainerID which are missing on the given datanode
*/
private void processMissingReplicas(final DatanodeDetails datanodeDetails,
final Set<ContainerID> missingReplicas) {
for (ContainerID id : missingReplicas) {
try {
containerManager.getContainerReplicas(id).stream()
.filter(replica -> replica.getDatanodeDetails()
.equals(datanodeDetails)).findFirst()
.ifPresent(replica -> {
try {
containerManager.removeContainerReplica(id, replica);
} catch (ContainerNotFoundException |
ContainerReplicaNotFoundException ignored) {
// This should not happen, but even if it happens, not an issue
}
});
} catch (ContainerNotFoundException e) {
LOG.warn("Cannot remove container replica, container {} not found.",
id, e);
}
}
}
/**
* Updates the Delete Transaction Id for the given datanode.
*
* @param datanodeDetails DatanodeDetails
* @param replicas List of ContainerReplicaProto
* @param publisher EventPublisher reference
*/
private void updateDeleteTransaction(final DatanodeDetails datanodeDetails,
final List<ContainerReplicaProto> replicas,
final EventPublisher publisher) {
final PendingDeleteStatusList pendingDeleteStatusList =
new PendingDeleteStatusList(datanodeDetails);
for (ContainerReplicaProto replicaProto : replicas) {
for (ContainerReplicaProto replica : replicas) {
try {
final ContainerID containerID = ContainerID.valueof(
replicaProto.getContainerID());
ReportHandlerHelper.processContainerReplica(containerManager,
containerID, replicaProto, datanodeDetails, publisher, LOG);
final ContainerInfo containerInfo = containerManager
.getContainer(containerID);
final ContainerInfo containerInfo = containerManager.getContainer(
ContainerID.valueof(replica.getContainerID()));
if (containerInfo.getDeleteTransactionId() >
replicaProto.getDeleteTransactionId()) {
pendingDeleteStatusList
.addPendingDeleteStatus(replicaProto.getDeleteTransactionId(),
containerInfo.getDeleteTransactionId(),
containerInfo.getContainerID());
replica.getDeleteTransactionId()) {
pendingDeleteStatusList.addPendingDeleteStatus(
replica.getDeleteTransactionId(),
containerInfo.getDeleteTransactionId(),
containerInfo.getContainerID());
}
} catch (ContainerNotFoundException e) {
LOG.error("Received container report for an unknown container {} from"
+ " datanode {} {}", replicaProto.getContainerID(),
datanodeDetails, e);
} catch (IOException e) {
LOG.error("Exception while processing container report for container"
+ " {} from datanode {} {}", replicaProto.getContainerID(),
datanodeDetails, e);
} catch (ContainerNotFoundException cnfe) {
LOG.warn("Cannot update pending delete transaction for " +
"container #{}. Reason: container missing.",
replica.getContainerID());
}
}
if (pendingDeleteStatusList.getNumPendingDeletes() > 0) {
@ -184,30 +196,4 @@ private void processContainerReplicas(final DatanodeDetails datanodeDetails,
pendingDeleteStatusList);
}
}
private void checkReplicationState(ContainerID containerID,
EventPublisher publisher) {
try {
ContainerInfo container = containerManager.getContainer(containerID);
replicateIfNeeded(container, publisher);
} catch (ContainerNotFoundException ex) {
LOG.warn("Container is missing from containerStateManager. Can't request "
+ "replication. {} {}", containerID, ex);
}
}
private void replicateIfNeeded(ContainerInfo container,
EventPublisher publisher) throws ContainerNotFoundException {
if (!container.isOpen() && replicationStatus.isReplicationEnabled()) {
final int existingReplicas = containerManager
.getContainerReplicas(container.containerID()).size();
final int expectedReplicas = container.getReplicationFactor().getNumber();
if (existingReplicas != expectedReplicas) {
publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
new ReplicationRequest(container.getContainerID(),
existingReplicas, expectedReplicas));
}
}
}
}

View File

@ -310,20 +310,19 @@ ContainerInfo allocateContainer(
*
* @param containerID - ContainerID
* @param event - LifeCycle Event
* @return Updated ContainerInfo.
* @throws SCMException on Failure.
*/
ContainerInfo updateContainerState(final ContainerID containerID,
void updateContainerState(final ContainerID containerID,
final HddsProtos.LifeCycleEvent event)
throws SCMException, ContainerNotFoundException {
final ContainerInfo info = containers.getContainerInfo(containerID);
try {
final LifeCycleState oldState = info.getState();
final LifeCycleState newState = stateMachine.getNextState(
info.getState(), event);
containers.updateState(containerID, info.getState(), newState);
containerStateCount.incrementAndGet(newState);
containerStateCount.decrementAndGet(info.getState());
return containers.getContainerInfo(containerID);
containerStateCount.decrementAndGet(oldState);
} catch (InvalidStateTransitionException ex) {
String error = String.format("Failed to update container state %s, " +
"reason: invalid state transition from state: %s upon " +
@ -334,18 +333,6 @@ ContainerInfo updateContainerState(final ContainerID containerID,
}
}
/**
* Update the container State.
* @param info - Container Info
* @return ContainerInfo
* @throws SCMException - on Error.
*/
ContainerInfo updateContainerInfo(final ContainerInfo info)
throws ContainerNotFoundException {
containers.updateContainerInfo(info);
return containers.getContainerInfo(info.containerID());
}
/**
* Update deleteTransactionId for a container.
*

View File

@ -18,55 +18,40 @@
package org.apache.hadoop.hdds.scm.container;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server
.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
import java.io.IOException;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos
.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.IncrementalContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Handles incremental container reports from datanode.
*/
public class IncrementalContainerReportHandler implements
EventHandler<IncrementalContainerReportFromDatanode> {
public class IncrementalContainerReportHandler extends
AbstractContainerReportHandler
implements EventHandler<IncrementalContainerReportFromDatanode> {
private static final Logger LOG =
LoggerFactory.getLogger(IncrementalContainerReportHandler.class);
private final PipelineManager pipelineManager;
private final ContainerManager containerManager;
private static final Logger LOG = LoggerFactory.getLogger(
IncrementalContainerReportHandler.class);
public IncrementalContainerReportHandler(
final PipelineManager pipelineManager,
final ContainerManager containerManager) {
Preconditions.checkNotNull(pipelineManager);
Preconditions.checkNotNull(containerManager);
this.pipelineManager = pipelineManager;
this.containerManager = containerManager;
super(containerManager, LOG);
}
@Override
public void onMessage(
final IncrementalContainerReportFromDatanode containerReportFromDatanode,
final EventPublisher publisher) {
public void onMessage(final IncrementalContainerReportFromDatanode report,
final EventPublisher publisher) {
for (ContainerReplicaProto replicaProto :
containerReportFromDatanode.getReport().getReportList()) {
report.getReport().getReportList()) {
try {
final DatanodeDetails datanodeDetails = containerReportFromDatanode
.getDatanodeDetails();
final ContainerID containerID = ContainerID
.valueof(replicaProto.getContainerID());
ReportHandlerHelper.processContainerReplica(containerManager,
containerID, replicaProto, datanodeDetails, publisher, LOG);
processContainerReplica(report.getDatanodeDetails(), replicaProto);
} catch (ContainerNotFoundException e) {
LOG.warn("Container {} not found!", replicaProto.getContainerID());
} catch (IOException e) {

View File

@ -169,6 +169,15 @@ public synchronized void start() {
}
}
/**
* Returns true if the Replication Monitor Thread is running.
*
* @return true if running, false otherwise
*/
public boolean isRunning() {
return replicationMonitor.isAlive();
}
/**
* Process all the containers immediately.
*/

View File

@ -1,365 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
/**
* Helper functions to handler container reports.
*/
public final class ReportHandlerHelper {
private ReportHandlerHelper() {}
/**
* Processes the container replica and updates the container state in SCM.
* If needed, sends command to datanode to update the replica state.
*
* @param containerManager ContainerManager instance
* @param containerId Id of the container
* @param replicaProto replica of the container
* @param datanodeDetails datanode where the replica resides
* @param publisher event publisher
* @param logger for logging
* @throws IOException
*/
static void processContainerReplica(final ContainerManager containerManager,
final ContainerID containerId, final ContainerReplicaProto replicaProto,
final DatanodeDetails datanodeDetails, final EventPublisher publisher,
final Logger logger) throws IOException {
final ContainerReplica replica = ContainerReplica.newBuilder()
.setContainerID(containerId)
.setContainerState(replicaProto.getState())
.setDatanodeDetails(datanodeDetails)
.setOriginNodeId(UUID.fromString(replicaProto.getOriginNodeId()))
.setSequenceId(replicaProto.getBlockCommitSequenceId())
.build();
// This is an in-memory update.
containerManager.updateContainerReplica(containerId, replica);
ReportHandlerHelper.reconcileContainerState(containerManager,
containerId, publisher, logger);
final ContainerInfo containerInfo = containerManager
.getContainer(containerId);
if (containerInfo.getUsedBytes() < replicaProto.getUsed()) {
containerInfo.setUsedBytes(replicaProto.getUsed());
}
if (containerInfo.getNumberOfKeys() < replicaProto.getKeyCount()) {
containerInfo.setNumberOfKeys(replicaProto.getKeyCount());
}
// Now we have reconciled the container state. If the container state and
// the replica state doesn't match, then take appropriate action.
ReportHandlerHelper.sendReplicaCommands(
datanodeDetails, containerInfo, replica, publisher, logger);
}
/**
* Reconcile the container state based on the ContainerReplica states.
* ContainerState is updated after the reconciliation.
*
* @param manager ContainerManager
* @param containerId container id
* @throws ContainerNotFoundException
*/
private static void reconcileContainerState(final ContainerManager manager,
final ContainerID containerId, final EventPublisher publisher,
final Logger logger) throws IOException {
// TODO: handle unhealthy replica.
synchronized (manager.getContainer(containerId)) {
final ContainerInfo container = manager.getContainer(containerId);
final Set<ContainerReplica> replicas = manager.getContainerReplicas(
containerId);
final LifeCycleState containerState = container.getState();
switch (containerState) {
case OPEN:
/*
* If the container state is OPEN.
* None of the replica should be in any other state.
*
*/
List<ContainerReplica> invalidReplicas = replicas.stream()
.filter(replica -> replica.getState() != State.OPEN)
.collect(Collectors.toList());
if (!invalidReplicas.isEmpty()) {
logger.warn("Container {} has invalid replica state." +
"Invalid Replicas: {}", containerId, invalidReplicas);
}
// A container cannot be over replicated when in OPEN state.
break;
case CLOSING:
/*
* SCM has asked DataNodes to close the container. Now the replicas
* can be in any of the following states.
*
* 1. OPEN
* 2. CLOSING
* 3. QUASI_CLOSED
* 4. CLOSED
*
* If all the replica are either in OPEN or CLOSING state, do nothing.
*
* If any one of the replica is in QUASI_CLOSED state, move the
* container to QUASI_CLOSED state.
*
* If any one of the replica is in CLOSED state, mark the container as
* CLOSED. The close has happened via Ratis.
*
*/
Optional<ContainerReplica> closedReplica = replicas.stream()
.filter(replica -> replica.getState() == State.CLOSED)
.findFirst();
if (closedReplica.isPresent()) {
container.updateSequenceId(closedReplica.get().getSequenceId());
manager.updateContainerState(
containerId, HddsProtos.LifeCycleEvent.CLOSE);
// TODO: remove container from OPEN pipeline, since the container is
// closed we can go ahead and remove it from Ratis pipeline.
} else if (replicas.stream()
.anyMatch(replica -> replica.getState() == State.QUASI_CLOSED)) {
manager.updateContainerState(
containerId, HddsProtos.LifeCycleEvent.QUASI_CLOSE);
}
break;
case QUASI_CLOSED:
/*
* The container is in QUASI_CLOSED state, this means that at least
* one of the replica is in QUASI_CLOSED/CLOSED state.
* Other replicas can be in any of the following state.
*
* 1. OPEN
* 2. CLOSING
* 3. QUASI_CLOSED
* 4. CLOSED
*
* If <50% of container replicas are in QUASI_CLOSED state and all
* the other replica are either in OPEN or CLOSING state, do nothing.
* We cannot identify the correct replica since we don't have quorum
* yet.
*
* If >50% (quorum) of replicas are in QUASI_CLOSED state and other
* replicas are either in OPEN or CLOSING state, try to identify
* the latest container replica using originNodeId and sequenceId.
* Force close those replica(s) which have the latest sequenceId.
*
* If at least one of the replica is in CLOSED state, mark the
* container as CLOSED. Force close the replicas which matches the
* sequenceId of the CLOSED replica.
*
*/
if (replicas.stream()
.anyMatch(replica -> replica.getState() == State.CLOSED)) {
manager.updateContainerState(
containerId, HddsProtos.LifeCycleEvent.FORCE_CLOSE);
// TODO: remove container from OPEN pipeline, since the container is
// closed we can go ahead and remove it from Ratis pipeline.
} else {
final int replicationFactor = container
.getReplicationFactor().getNumber();
final List<ContainerReplica> quasiClosedReplicas = replicas.stream()
.filter(replica -> replica.getState() == State.QUASI_CLOSED)
.collect(Collectors.toList());
final long uniqueQuasiClosedReplicaCount = quasiClosedReplicas
.stream()
.map(ContainerReplica::getOriginDatanodeId)
.distinct()
.count();
if (uniqueQuasiClosedReplicaCount > (replicationFactor / 2)) {
// Quorum of unique replica has been QUASI_CLOSED
long sequenceId = forceCloseContainerReplicaWithHighestSequenceId(
container, quasiClosedReplicas, publisher);
if (sequenceId != -1L) {
container.updateSequenceId(sequenceId);
}
}
}
break;
case CLOSED:
/*
* The container is already in closed state. do nothing.
*/
break;
case DELETING:
// Not handled.
throw new UnsupportedOperationException("Unsupported container state" +
" 'DELETING'.");
case DELETED:
// Not handled.
throw new UnsupportedOperationException("Unsupported container state" +
" 'DELETED'.");
default:
break;
}
}
}
/**
* Compares the QUASI_CLOSED replicas of a container and sends close command.
*
* @param quasiClosedReplicas list of quasi closed replicas
* @return the sequenceId of the closed replica.
*/
private static long forceCloseContainerReplicaWithHighestSequenceId(
final ContainerInfo container,
final List<ContainerReplica> quasiClosedReplicas,
final EventPublisher publisher) {
final long highestSequenceId = quasiClosedReplicas.stream()
.map(ContainerReplica::getSequenceId)
.max(Long::compare)
.orElse(-1L);
if (highestSequenceId != -1L) {
quasiClosedReplicas.stream()
.filter(replica -> replica.getSequenceId() == highestSequenceId)
.forEach(replica -> {
CloseContainerCommand closeContainerCommand =
new CloseContainerCommand(container.getContainerID(),
container.getPipelineID(), true);
publisher.fireEvent(DATANODE_COMMAND,
new CommandForDatanode<>(
replica.getDatanodeDetails().getUuid(),
closeContainerCommand));
});
}
return highestSequenceId;
}
/**
* Based on the container and replica state, send command to datanode if
* required.
*
* @param datanodeDetails datanode where the replica resides
* @param containerInfo container information
* @param replica replica information
* @param publisher queue to publish the datanode command event
* @param log for logging
*/
static void sendReplicaCommands(
final DatanodeDetails datanodeDetails,
final ContainerInfo containerInfo,
final ContainerReplica replica,
final EventPublisher publisher,
final Logger log) {
final HddsProtos.LifeCycleState containerState = containerInfo.getState();
final ContainerReplicaProto.State replicaState = replica.getState();
if(!ReportHandlerHelper.compareState(containerState, replicaState)) {
if (containerState == HddsProtos.LifeCycleState.OPEN) {
// When a container state in SCM is OPEN, there is no way a datanode
// can quasi close/close the container.
log.warn("Invalid container replica state for container {}" +
" from datanode {}. Expected state is OPEN.",
containerInfo.containerID(), datanodeDetails);
// The replica can go CORRUPT, we have to handle it.
}
if (containerState == HddsProtos.LifeCycleState.CLOSING ||
containerState == HddsProtos.LifeCycleState.QUASI_CLOSED) {
// Resend close container event for this datanode if the container
// replica state is OPEN/CLOSING.
if (replicaState == ContainerReplicaProto.State.OPEN ||
replicaState == ContainerReplicaProto.State.CLOSING) {
CloseContainerCommand closeContainerCommand =
new CloseContainerCommand(containerInfo.getContainerID(),
containerInfo.getPipelineID());
publisher.fireEvent(DATANODE_COMMAND,
new CommandForDatanode<>(
replica.getDatanodeDetails().getUuid(),
closeContainerCommand));
}
}
if (containerState == HddsProtos.LifeCycleState.CLOSED) {
if (replicaState == ContainerReplicaProto.State.OPEN ||
replicaState == ContainerReplicaProto.State.CLOSING ||
replicaState == ContainerReplicaProto.State.QUASI_CLOSED) {
// Send force close container event for this datanode if the container
// replica state is OPEN/CLOSING/QUASI_CLOSED.
// Close command will be send only if this replica matches the
// sequence of the container.
if (containerInfo.getSequenceId() ==
replica.getSequenceId()) {
CloseContainerCommand closeContainerCommand =
new CloseContainerCommand(containerInfo.getContainerID(),
containerInfo.getPipelineID(), true);
publisher.fireEvent(DATANODE_COMMAND,
new CommandForDatanode<>(
replica.getDatanodeDetails().getUuid(),
closeContainerCommand));
}
// TODO: delete the replica if the BCSID doesn't match.
}
}
}
}
/**
* Compares the container and replica state.
*
* @param containerState container state
* @param replicaState replica state
* @return true if the states are same, else false
*/
private static boolean compareState(final LifeCycleState containerState,
final State replicaState) {
// TODO: handle unhealthy replica.
switch (containerState) {
case OPEN:
return replicaState == State.OPEN;
case CLOSING:
return replicaState == State.CLOSING;
case QUASI_CLOSED:
return replicaState == State.QUASI_CLOSED;
case CLOSED:
return replicaState == State.CLOSED;
case DELETING:
return false;
case DELETED:
return false;
default:
return false;
}
}
}

View File

@ -295,18 +295,20 @@ public HddsProtos.LifeCycleState updateContainerState(
// Should we return the updated ContainerInfo instead of LifeCycleState?
lock.lock();
try {
ContainerInfo container = containerStateManager.getContainer(containerID);
ContainerInfo updatedContainer =
updateContainerStateInternal(containerID, event);
if (updatedContainer.getState() != LifeCycleState.OPEN
&& container.getState() == LifeCycleState.OPEN) {
final ContainerInfo container = containerStateManager
.getContainer(containerID);
final LifeCycleState oldState = container.getState();
containerStateManager.updateContainerState(containerID, event);
final LifeCycleState newState = container.getState();
if (oldState == LifeCycleState.OPEN && newState != LifeCycleState.OPEN) {
pipelineManager
.removeContainerFromPipeline(updatedContainer.getPipelineID(),
.removeContainerFromPipeline(container.getPipelineID(),
containerID);
}
final byte[] dbKey = Longs.toByteArray(containerID.getId());
containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
return updatedContainer.getState();
containerStore.put(dbKey, container.getProtobuf().toByteArray());
return newState;
} catch (ContainerNotFoundException cnfe) {
throw new SCMException(
"Failed to update container state"
@ -318,11 +320,6 @@ public HddsProtos.LifeCycleState updateContainerState(
}
}
private ContainerInfo updateContainerStateInternal(ContainerID containerID,
HddsProtos.LifeCycleEvent event) throws IOException {
return containerStateManager.updateContainerState(containerID, event);
}
/**
* Update deleteTransactionId according to deleteTransactionMap.

View File

@ -296,8 +296,7 @@ public void updateState(ContainerID containerID, LifeCycleState currentState,
checkIfContainerExist(containerID);
final ContainerInfo currentInfo = containerMap.get(containerID);
try {
final ContainerInfo newInfo = new ContainerInfo(currentInfo);
newInfo.setState(newState);
currentInfo.setState(newState);
// We are updating two places before this update is done, these can
// fail independently, since the code needs to handle it.
@ -309,13 +308,12 @@ public void updateState(ContainerID containerID, LifeCycleState currentState,
// roll back the earlier change we did. If the rollback fails, we can
// be in an inconsistent state,
containerMap.put(containerID, newInfo);
lifeCycleStateMap.update(currentState, newState, containerID);
LOG.trace("Updated the container {} to new state. Old = {}, new = " +
"{}", containerID, currentState, newState);
// Just flush both old and new data sets from the result cache.
flushCache(currentInfo, newInfo);
flushCache(currentInfo);
} catch (SCMException ex) {
LOG.error("Unable to update the container state. {}", ex);
// we need to revert the change in this attribute since we are not
@ -324,7 +322,7 @@ public void updateState(ContainerID containerID, LifeCycleState currentState,
"old state. Old = {}, Attempted state = {}", currentState,
newState);
containerMap.put(containerID, currentInfo);
currentInfo.setState(currentState);
// if this line throws, the state map can be in an inconsistent
// state, since we will have modified the attribute by the

View File

@ -23,7 +23,7 @@
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;

View File

@ -55,8 +55,7 @@
import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
@ -99,7 +98,6 @@
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.utils.HddsVersionInfo;
import org.apache.hadoop.utils.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -176,7 +174,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private SCMMetadataStore scmMetadataStore;
private final EventQueue eventQueue;
private final Scheduler commonScheduler;
/*
* HTTP endpoint for JMX access.
*/
@ -199,7 +196,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
private final LeaseManager<Long> commandWatcherLeaseManager;
private final ReplicationActivityStatus replicationStatus;
private SCMChillModeManager scmChillModeManager;
private CertificateServer certificateServer;
@ -287,8 +283,6 @@ public StorageContainerManager(OzoneConfiguration conf,
commandWatcherLeaseManager = new LeaseManager<>("CommandWatcher",
watcherTimeout);
initalizeSystemManagers(conf, configurator);
commonScheduler = new Scheduler("SCMCommonScheduler", false, 1);
replicationStatus = new ReplicationActivityStatus(commonScheduler);
CloseContainerEventHandler closeContainerHandler =
new CloseContainerEventHandler(pipelineManager, containerManager);
@ -311,12 +305,10 @@ public StorageContainerManager(OzoneConfiguration conf,
new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
ContainerReportHandler containerReportHandler =
new ContainerReportHandler(scmNodeManager, pipelineManager,
containerManager, replicationStatus);
new ContainerReportHandler(scmNodeManager, containerManager);
IncrementalContainerReportHandler incrementalContainerReportHandler =
new IncrementalContainerReportHandler(
pipelineManager, containerManager);
new IncrementalContainerReportHandler(containerManager);
PipelineActionHandler pipelineActionHandler =
new PipelineActionHandler(pipelineManager, conf);
@ -343,7 +335,7 @@ public StorageContainerManager(OzoneConfiguration conf,
httpServer = new StorageContainerManagerHttpServer(conf);
chillModeHandler = new ChillModeHandler(configuration,
clientProtocolServer, scmBlockManager, replicationStatus);
clientProtocolServer, scmBlockManager, replicationManager);
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.RETRIABLE_DATANODE_COMMAND, scmNodeManager);
@ -422,8 +414,8 @@ private void initalizeSystemManagers(OzoneConfiguration conf,
if (configurator.getReplicationManager() != null) {
replicationManager = configurator.getReplicationManager();
} else {
replicationManager = new ReplicationManager(containerPlacementPolicy,
containerManager, eventQueue, commandWatcherLeaseManager);
replicationManager = new ReplicationManager(conf,
containerManager, containerPlacementPolicy, eventQueue);
}
if(configurator.getScmChillModeManager() != null) {
scmChillModeManager = configurator.getScmChillModeManager();
@ -917,8 +909,6 @@ public void start() throws IOException {
httpServer.start();
scmBlockManager.start();
replicationStatus.start();
replicationManager.start();
// Start jvm monitor
jvmPauseMonitor = new JvmPauseMonitor();
@ -933,14 +923,6 @@ public void start() throws IOException {
*/
public void stop() {
try {
LOG.info("Stopping Replication Activity Status tracker.");
replicationStatus.close();
} catch (Exception ex) {
LOG.error("Replication Activity Status tracker stop failed.", ex);
}
try {
LOG.info("Stopping Replication Manager Service.");
replicationManager.stop();
@ -1017,13 +999,6 @@ public void stop() {
LOG.error("SCM Event Queue stop failed", ex);
}
try {
LOG.info("Stopping SCM Common Scheduler.");
commonScheduler.close();
} catch (Exception ex) {
LOG.error("SCM Common Scheduler close failed {}", ex);
}
if (jvmPauseMonitor != null) {
jvmPauseMonitor.stop();
}

View File

@ -533,6 +533,7 @@ public static ContainerInfo getContainer(
.setReplicationType(HddsProtos.ReplicationType.RATIS)
.setReplicationFactor(HddsProtos.ReplicationFactor.THREE)
.setState(state)
.setSequenceId(10000L)
.setOwner("TEST")
.build();
}

View File

@ -22,16 +22,20 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.utils.Scheduler;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.HashSet;
/**
* Tests ChillModeHandler behavior.
*/
@ -40,7 +44,7 @@ public class TestChillModeHandler {
private OzoneConfiguration configuration;
private SCMClientProtocolServer scmClientProtocolServer;
private ReplicationActivityStatus replicationActivityStatus;
private ReplicationManager replicationManager;
private BlockManager blockManager;
private ChillModeHandler chillModeHandler;
private EventQueue eventQueue;
@ -54,15 +58,19 @@ public void setup(boolean enabled) {
"3s");
scmClientProtocolServer =
Mockito.mock(SCMClientProtocolServer.class);
replicationActivityStatus = new ReplicationActivityStatus(
new Scheduler("SCMCommonScheduler", false, 1));
eventQueue = new EventQueue();
final ContainerManager containerManager =
Mockito.mock(ContainerManager.class);
Mockito.when(containerManager.getContainerIDs())
.thenReturn(new HashSet<>());
replicationManager = new ReplicationManager(configuration,
containerManager, Mockito.mock(ContainerPlacementPolicy.class),
eventQueue);
blockManager = Mockito.mock(BlockManagerImpl.class);
chillModeHandler =
new ChillModeHandler(configuration, scmClientProtocolServer,
blockManager, replicationActivityStatus);
blockManager, replicationManager);
eventQueue = new EventQueue();
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler);
chillModeStatus = new SCMChillModeManager.ChillModeStatus(false);
@ -82,7 +90,7 @@ public void testChillModeHandlerWithChillModeEnabled() throws Exception {
Assert.assertFalse(scmClientProtocolServer.getChillModeStatus());
Assert.assertFalse(((BlockManagerImpl) blockManager).isScmInChillMode());
GenericTestUtils.waitFor(() ->
replicationActivityStatus.isReplicationEnabled(), 1000, 5000);
replicationManager.isRunning(), 1000, 5000);
}
@ -99,6 +107,6 @@ public void testChillModeHandlerWithChillModeDisbaled() throws Exception{
Assert.assertFalse(scmClientProtocolServer.getChillModeStatus());
Assert.assertFalse(((BlockManagerImpl) blockManager).isScmInChillMode());
GenericTestUtils.waitFor(() ->
replicationActivityStatus.isReplicationEnabled(), 1000, 5000);
replicationManager.isRunning(), 1000, 5000);
}
}

View File

@ -16,124 +16,146 @@
*/
package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.replication
.ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server
.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.utils.Scheduler;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hadoop.hdds.scm.TestUtils
.getReplicas;
import static org.apache.hadoop.hdds.scm.TestUtils
.getContainer;
import static org.apache.hadoop.hdds.scm.container
.TestContainerReportHelper.addContainerToContainerManager;
import static org.apache.hadoop.hdds.scm.container
.TestContainerReportHelper.mockUpdateContainerReplica;
import static org.apache.hadoop.hdds.scm.container
.TestContainerReportHelper.mockUpdateContainerState;
import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
/**
* Test the behaviour of the ContainerReportHandler.
*/
public class TestContainerReportHandler {
private static Scheduler scheduler;
private NodeManager nodeManager;
private ContainerManager containerManager;
private ContainerStateManager containerStateManager;
private EventPublisher publisher;
@Before
public void setup() throws IOException {
final Configuration conf = new OzoneConfiguration();
this.nodeManager = new MockNodeManager(true, 10);
this.containerManager = Mockito.mock(ContainerManager.class);
this.containerStateManager = new ContainerStateManager(conf);
this.publisher = Mockito.mock(EventPublisher.class);
Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
.getContainer((ContainerID)invocation.getArguments()[0]));
Mockito.when(containerManager.getContainerReplicas(
Mockito.any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
.getContainerReplicas((ContainerID)invocation.getArguments()[0]));
Mockito.doAnswer(invocation -> {
containerStateManager
.updateContainerState((ContainerID)invocation.getArguments()[0],
(HddsProtos.LifeCycleEvent)invocation.getArguments()[1]);
return null;
}).when(containerManager).updateContainerState(
Mockito.any(ContainerID.class),
Mockito.any(HddsProtos.LifeCycleEvent.class));
Mockito.doAnswer(invocation -> {
containerStateManager.updateContainerReplica(
(ContainerID) invocation.getArguments()[0],
(ContainerReplica) invocation.getArguments()[1]);
return null;
}).when(containerManager).updateContainerReplica(
Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
Mockito.doAnswer(invocation -> {
containerStateManager.removeContainerReplica(
(ContainerID) invocation.getArguments()[0],
(ContainerReplica) invocation.getArguments()[1]);
return null;
}).when(containerManager).removeContainerReplica(
Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
@BeforeClass
public static void setup() {
scheduler = new Scheduler("SCMCommonScheduler", false, 1);
}
@AfterClass
public static void tearDown() {
scheduler.close();
@After
public void tearDown() throws IOException {
containerStateManager.close();
}
@Test
public void testUnderReplicatedContainer()
throws NodeNotFoundException, ContainerNotFoundException,
ContainerReplicaNotFoundException {
throws NodeNotFoundException, ContainerNotFoundException, SCMException {
final NodeManager nodeManager = new MockNodeManager(true, 10);
final ContainerManager containerManager = Mockito.mock(
ContainerManager.class);
final ReplicationActivityStatus replicationActivityStatus =
new ReplicationActivityStatus(scheduler);
replicationActivityStatus.enableReplication();
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
final ContainerReportHandler reportHandler = new ContainerReportHandler(
nodeManager, pipelineManager, containerManager,
replicationActivityStatus);
nodeManager, containerManager);
final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
NodeState.HEALTHY).iterator();
final DatanodeDetails datanodeOne = nodeIterator.next();
final DatanodeDetails datanodeTwo = nodeIterator.next();
final DatanodeDetails datanodeThree = nodeIterator.next();
final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED);
final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
final Set<ContainerID> containerIDSet = Stream.of(
containerOne.containerID(), containerTwo.containerID())
.collect(Collectors.toSet());
final Set<ContainerReplica> containerOneReplicas = getReplicas(
containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
datanodeOne, datanodeTwo, datanodeThree);
final Set<ContainerReplica> containerTwoReplicas = getReplicas(
containerTwo.containerID(), ContainerReplicaProto.State.CLOSED,
datanodeOne, datanodeTwo, datanodeThree);
nodeManager.setContainers(datanodeOne, containerIDSet);
nodeManager.setContainers(datanodeTwo, containerIDSet);
nodeManager.setContainers(datanodeThree, containerIDSet);
addContainerToContainerManager(
containerManager, containerOne, containerOneReplicas);
addContainerToContainerManager(
containerManager, containerTwo, containerTwoReplicas);
containerStateManager.loadContainer(containerOne);
containerStateManager.loadContainer(containerTwo);
Mockito.doAnswer((Answer<Void>) invocation -> {
Object[] args = invocation.getArguments();
if (args[0].equals(containerOne.containerID())) {
ContainerReplica replica = (ContainerReplica) args[1];
containerOneReplicas.remove(replica);
}
return null;
}).when(containerManager).removeContainerReplica(
Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
getReplicas(containerOne.containerID(),
ContainerReplicaProto.State.CLOSED,
datanodeOne, datanodeTwo, datanodeThree)
.forEach(r -> {
try {
containerStateManager.updateContainerReplica(
containerOne.containerID(), r);
} catch (ContainerNotFoundException ignored) {
}
});
getReplicas(containerTwo.containerID(),
ContainerReplicaProto.State.CLOSED,
datanodeOne, datanodeTwo, datanodeThree)
.forEach(r -> {
try {
containerStateManager.updateContainerReplica(
containerTwo.containerID(), r);
} catch (ContainerNotFoundException ignored) {
}
});
Mockito.when(
containerManager.getContainerReplicas(containerOne.containerID()))
.thenReturn(containerOneReplicas);
Mockito.when(
containerManager.getContainerReplicas(containerTwo.containerID()))
.thenReturn(containerTwoReplicas);
// SCM expects both containerOne and containerTwo to be in all the three
// datanodes datanodeOne, datanodeTwo and datanodeThree
@ -145,70 +167,65 @@ public void testUnderReplicatedContainer()
final ContainerReportsProto containerReport = getContainerReportsProto(
containerTwo.containerID(), ContainerReplicaProto.State.CLOSED,
datanodeOne.getUuidString());
final EventPublisher publisher = Mockito.mock(EventPublisher.class);
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(datanodeOne, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);
Assert.assertEquals(2, containerManager.getContainerReplicas(
containerOne.containerID()).size());
// Now we should get a replication request for containerOne
Mockito.verify(publisher, Mockito.times(1))
.fireEvent(Mockito.any(), Mockito.any());
// TODO: verify whether are actually getting a replication request event
// for containerOne
}
@Test
public void testOverReplicatedContainer() throws NodeNotFoundException,
ContainerNotFoundException {
SCMException, ContainerNotFoundException {
final NodeManager nodeManager = new MockNodeManager(true, 10);
final ContainerManager containerManager = Mockito.mock(
ContainerManager.class);
final ReplicationActivityStatus replicationActivityStatus =
new ReplicationActivityStatus(scheduler);
replicationActivityStatus.enableReplication();
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
final ContainerReportHandler reportHandler = new ContainerReportHandler(
nodeManager, pipelineManager, containerManager,
replicationActivityStatus);
nodeManager, containerManager);
final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
NodeState.HEALTHY).iterator();
final DatanodeDetails datanodeOne = nodeIterator.next();
final DatanodeDetails datanodeTwo = nodeIterator.next();
final DatanodeDetails datanodeThree = nodeIterator.next();
final DatanodeDetails datanodeFour = nodeIterator.next();
final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSED);
final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
final Set<ContainerID> containerIDSet = Stream.of(
containerOne.containerID(), containerTwo.containerID())
.collect(Collectors.toSet());
final Set<ContainerReplica> containerOneReplicas = getReplicas(
containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
datanodeOne, datanodeTwo, datanodeThree);
final Set<ContainerReplica> containerTwoReplicas = getReplicas(
containerTwo.containerID(), ContainerReplicaProto.State.CLOSED,
datanodeOne, datanodeTwo, datanodeThree);
nodeManager.setContainers(datanodeOne, containerIDSet);
nodeManager.setContainers(datanodeTwo, containerIDSet);
nodeManager.setContainers(datanodeThree, containerIDSet);
addContainerToContainerManager(
containerManager, containerOne, containerOneReplicas);
addContainerToContainerManager(
containerManager, containerTwo, containerTwoReplicas);
containerStateManager.loadContainer(containerOne);
containerStateManager.loadContainer(containerTwo);
Mockito.doAnswer((Answer<Void>) invocation -> {
Object[] args = invocation.getArguments();
if (args[0].equals(containerOne.containerID())) {
containerOneReplicas.add((ContainerReplica) args[1]);
}
return null;
}).when(containerManager).updateContainerReplica(
Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
getReplicas(containerOne.containerID(),
ContainerReplicaProto.State.CLOSED,
datanodeOne, datanodeTwo, datanodeThree)
.forEach(r -> {
try {
containerStateManager.updateContainerReplica(
containerOne.containerID(), r);
} catch (ContainerNotFoundException ignored) {
}
});
getReplicas(containerTwo.containerID(),
ContainerReplicaProto.State.CLOSED,
datanodeOne, datanodeTwo, datanodeThree)
.forEach(r -> {
try {
containerStateManager.updateContainerReplica(
containerTwo.containerID(), r);
} catch (ContainerNotFoundException ignored) {
}
});
// SCM expects both containerOne and containerTwo to be in all the three
@ -220,114 +237,15 @@ public void testOverReplicatedContainer() throws NodeNotFoundException,
final ContainerReportsProto containerReport = getContainerReportsProto(
containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
datanodeOne.getUuidString());
final EventPublisher publisher = Mockito.mock(EventPublisher.class);
datanodeFour.getUuidString());
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(datanodeFour, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);
Mockito.verify(publisher, Mockito.times(1))
.fireEvent(Mockito.any(), Mockito.any());
// TODO: verify whether are actually getting a replication request event
// for containerOne
Assert.assertEquals(4, containerManager.getContainerReplicas(
containerOne.containerID()).size());
}
@Test
public void testOpenToClosing()
throws NodeNotFoundException, ContainerNotFoundException {
/*
* The container is in CLOSING state and all the replicas are either in
* OPEN or CLOSING state.
*
* The datanode reports that the replica is still in OPEN state.
*
* In this case SCM should trigger close container event to the datanode.
*/
final NodeManager nodeManager = new MockNodeManager(true, 10);
final ContainerManager containerManager = Mockito.mock(
ContainerManager.class);
final ReplicationActivityStatus replicationActivityStatus =
new ReplicationActivityStatus(scheduler);
replicationActivityStatus.enableReplication();
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
final ContainerReportHandler reportHandler = new ContainerReportHandler(
nodeManager, pipelineManager, containerManager,
replicationActivityStatus);
final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
NodeState.HEALTHY).iterator();
final DatanodeDetails datanodeOne = nodeIterator.next();
final DatanodeDetails datanodeTwo = nodeIterator.next();
final DatanodeDetails datanodeThree = nodeIterator.next();
final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING);
final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
final Set<ContainerID> containerIDSet = Stream.of(
containerOne.containerID(), containerTwo.containerID())
.collect(Collectors.toSet());
final Set<ContainerReplica> containerOneReplicas = getReplicas(
containerOne.containerID(), ContainerReplicaProto.State.OPEN,
datanodeOne);
containerOneReplicas.addAll(getReplicas(
containerOne.containerID(), ContainerReplicaProto.State.CLOSING,
datanodeTwo, datanodeThree));
final Set<ContainerReplica> containerTwoReplicas = getReplicas(
containerTwo.containerID(), ContainerReplicaProto.State.CLOSED,
datanodeOne, datanodeTwo, datanodeThree);
nodeManager.setContainers(datanodeOne, containerIDSet);
nodeManager.setContainers(datanodeTwo, containerIDSet);
nodeManager.setContainers(datanodeThree, containerIDSet);
addContainerToContainerManager(
containerManager, containerOne, containerOneReplicas);
addContainerToContainerManager(
containerManager, containerTwo, containerTwoReplicas);
mockUpdateContainerReplica(
containerManager, containerOne, containerOneReplicas);
// Replica in datanodeOne of containerOne is in OPEN state.
final ContainerReportsProto containerReport = getContainerReportsProto(
containerOne.containerID(), ContainerReplicaProto.State.OPEN,
datanodeOne.getUuidString());
final EventPublisher publisher = Mockito.mock(EventPublisher.class);
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(datanodeOne, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);
// Now we should get close container event for containerOne on datanodeOne
Mockito.verify(publisher, Mockito.times(1))
.fireEvent(Mockito.any(), Mockito.any());
// TODO: verify whether are actually getting a close container
// datanode command for containerOne/datanodeOne
/*
* The container is in CLOSING state and all the replicas are either in
* OPEN or CLOSING state.
*
* The datanode reports that the replica is in CLOSING state.
*
* In this case SCM should trigger close container event to the datanode.
*/
// Replica in datanodeOne of containerOne is in OPEN state.
final ContainerReportsProto containerReportTwo = getContainerReportsProto(
containerOne.containerID(), ContainerReplicaProto.State.OPEN,
datanodeOne.getUuidString());
final ContainerReportFromDatanode containerReportTwoFromDatanode =
new ContainerReportFromDatanode(datanodeOne, containerReportTwo);
reportHandler.onMessage(containerReportTwoFromDatanode, publisher);
// Now we should get close container event for containerOne on datanodeOne
Mockito.verify(publisher, Mockito.times(2))
.fireEvent(Mockito.any(), Mockito.any());
// TODO: verify whether are actually getting a close container
// datanode command for containerOne/datanodeOne
}
@Test
public void testClosingToClosed() throws NodeNotFoundException, IOException {
@ -339,27 +257,23 @@ public void testClosingToClosed() throws NodeNotFoundException, IOException {
*
* In this case SCM should mark the container as CLOSED.
*/
final NodeManager nodeManager = new MockNodeManager(true, 10);
final ContainerManager containerManager = Mockito.mock(
ContainerManager.class);
final ReplicationActivityStatus replicationActivityStatus =
new ReplicationActivityStatus(scheduler);
replicationActivityStatus.enableReplication();
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
final ContainerReportHandler reportHandler = new ContainerReportHandler(
nodeManager, pipelineManager, containerManager,
replicationActivityStatus);
nodeManager, containerManager);
final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
NodeState.HEALTHY).iterator();
final DatanodeDetails datanodeOne = nodeIterator.next();
final DatanodeDetails datanodeTwo = nodeIterator.next();
final DatanodeDetails datanodeThree = nodeIterator.next();
final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING);
final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
final Set<ContainerID> containerIDSet = Stream.of(
containerOne.containerID(), containerTwo.containerID())
.collect(Collectors.toSet());
final Set<ContainerReplica> containerOneReplicas = getReplicas(
containerOne.containerID(),
ContainerReplicaProto.State.CLOSING,
@ -379,25 +293,36 @@ public void testClosingToClosed() throws NodeNotFoundException, IOException {
nodeManager.setContainers(datanodeTwo, containerIDSet);
nodeManager.setContainers(datanodeThree, containerIDSet);
addContainerToContainerManager(
containerManager, containerOne, containerOneReplicas);
addContainerToContainerManager(
containerManager, containerTwo, containerTwoReplicas);
mockUpdateContainerReplica(
containerManager, containerOne, containerOneReplicas);
mockUpdateContainerState(containerManager, containerOne,
LifeCycleEvent.CLOSE, LifeCycleState.CLOSED);
containerStateManager.loadContainer(containerOne);
containerStateManager.loadContainer(containerTwo);
containerOneReplicas.forEach(r -> {
try {
containerStateManager.updateContainerReplica(
containerTwo.containerID(), r);
} catch (ContainerNotFoundException ignored) {
}
});
containerTwoReplicas.forEach(r -> {
try {
containerStateManager.updateContainerReplica(
containerTwo.containerID(), r);
} catch (ContainerNotFoundException ignored) {
}
});
final ContainerReportsProto containerReport = getContainerReportsProto(
containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
datanodeOne.getUuidString());
final EventPublisher publisher = Mockito.mock(EventPublisher.class);
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(datanodeOne, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);
Assert.assertEquals(
LifeCycleState.CLOSED, containerOne.getState());
Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState());
}
@Test
@ -411,29 +336,23 @@ public void testClosingToQuasiClosed()
*
* In this case SCM should move the container to QUASI_CLOSED.
*/
final NodeManager nodeManager = new MockNodeManager(true, 10);
final ContainerManager containerManager = Mockito.mock(
ContainerManager.class);
final ReplicationActivityStatus replicationActivityStatus =
new ReplicationActivityStatus(scheduler);
replicationActivityStatus.enableReplication();
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
final ContainerReportHandler reportHandler = new ContainerReportHandler(
nodeManager, pipelineManager, containerManager,
replicationActivityStatus);
nodeManager, containerManager);
final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
NodeState.HEALTHY).iterator();
final DatanodeDetails datanodeOne = nodeIterator.next();
final DatanodeDetails datanodeTwo = nodeIterator.next();
final DatanodeDetails datanodeThree = nodeIterator.next();
final ContainerInfo containerOne =
getContainer(LifeCycleState.CLOSING);
final ContainerInfo containerTwo =
getContainer(LifeCycleState.CLOSED);
final ContainerInfo containerOne = getContainer(LifeCycleState.CLOSING);
final ContainerInfo containerTwo = getContainer(LifeCycleState.CLOSED);
final Set<ContainerID> containerIDSet = Stream.of(
containerOne.containerID(), containerTwo.containerID())
.collect(Collectors.toSet());
final Set<ContainerReplica> containerOneReplicas = getReplicas(
containerOne.containerID(),
ContainerReplicaProto.State.CLOSING,
@ -451,60 +370,65 @@ public void testClosingToQuasiClosed()
nodeManager.setContainers(datanodeTwo, containerIDSet);
nodeManager.setContainers(datanodeThree, containerIDSet);
addContainerToContainerManager(
containerManager, containerOne, containerOneReplicas);
addContainerToContainerManager(
containerManager, containerTwo, containerTwoReplicas);
mockUpdateContainerReplica(
containerManager, containerOne, containerOneReplicas);
mockUpdateContainerState(containerManager, containerOne,
LifeCycleEvent.QUASI_CLOSE, LifeCycleState.QUASI_CLOSED);
containerStateManager.loadContainer(containerOne);
containerStateManager.loadContainer(containerTwo);
containerOneReplicas.forEach(r -> {
try {
containerStateManager.updateContainerReplica(
containerTwo.containerID(), r);
} catch (ContainerNotFoundException ignored) {
}
});
containerTwoReplicas.forEach(r -> {
try {
containerStateManager.updateContainerReplica(
containerTwo.containerID(), r);
} catch (ContainerNotFoundException ignored) {
}
});
final ContainerReportsProto containerReport = getContainerReportsProto(
containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED,
datanodeOne.getUuidString());
final EventPublisher publisher = Mockito.mock(EventPublisher.class);
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(datanodeOne, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);
Assert.assertEquals(
LifeCycleState.QUASI_CLOSED, containerOne.getState());
Assert.assertEquals(LifeCycleState.QUASI_CLOSED, containerOne.getState());
}
@Test
public void testQuasiClosedWithDifferentOriginNodeReplica()
public void testQuasiClosedToClosed()
throws NodeNotFoundException, IOException {
/*
* The container is in QUASI_CLOSED state.
* - One of the replica is in QUASI_CLOSED state
* - The other two replica are in OPEN/CLOSING state
*
* The datanode reports the second replica is now QUASI_CLOSED.
* The datanode reports the second replica is now CLOSED.
*
* In this case SCM should CLOSE the container with highest BCSID and
* send force close command to the datanode.
* In this case SCM should CLOSE the container.
*/
final NodeManager nodeManager = new MockNodeManager(true, 10);
final ContainerManager containerManager = Mockito.mock(
ContainerManager.class);
final ReplicationActivityStatus replicationActivityStatus =
new ReplicationActivityStatus(scheduler);
replicationActivityStatus.enableReplication();
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
final ContainerReportHandler reportHandler = new ContainerReportHandler(
nodeManager, pipelineManager, containerManager,
replicationActivityStatus);
nodeManager, containerManager);
final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
NodeState.HEALTHY).iterator();
final DatanodeDetails datanodeOne = nodeIterator.next();
final DatanodeDetails datanodeTwo = nodeIterator.next();
final DatanodeDetails datanodeThree = nodeIterator.next();
final ContainerInfo containerOne =
getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerInfo containerTwo =
getContainer(LifeCycleState.CLOSED);
final Set<ContainerID> containerIDSet = Stream.of(
containerOne.containerID(), containerTwo.containerID())
.collect(Collectors.toSet());
@ -526,139 +450,42 @@ public void testQuasiClosedWithDifferentOriginNodeReplica()
nodeManager.setContainers(datanodeTwo, containerIDSet);
nodeManager.setContainers(datanodeThree, containerIDSet);
addContainerToContainerManager(
containerManager, containerOne, containerOneReplicas);
addContainerToContainerManager(
containerManager, containerTwo, containerTwoReplicas);
mockUpdateContainerReplica(
containerManager, containerOne, containerOneReplicas);
mockUpdateContainerState(containerManager, containerOne,
LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED);
containerStateManager.loadContainer(containerOne);
containerStateManager.loadContainer(containerTwo);
containerOneReplicas.forEach(r -> {
try {
containerStateManager.updateContainerReplica(
containerTwo.containerID(), r);
} catch (ContainerNotFoundException ignored) {
}
});
containerTwoReplicas.forEach(r -> {
try {
containerStateManager.updateContainerReplica(
containerTwo.containerID(), r);
} catch (ContainerNotFoundException ignored) {
}
});
// Container replica with datanodeOne as originNodeId is already
// QUASI_CLOSED. Now we will tell SCM that container replica from
// datanodeTwo is also QUASI_CLOSED, but has higher sequenceId.
final ContainerReportsProto containerReport = getContainerReportsProto(
containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED,
datanodeTwo.getUuidString(), 999999L);
final EventPublisher publisher = Mockito.mock(EventPublisher.class);
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(datanodeTwo, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);
// Now we should get force close container event for containerOne on
// datanodeTwo
Mockito.verify(publisher, Mockito.times(1))
.fireEvent(Mockito.any(), Mockito.any());
// TODO: verify whether are actually getting a force close container
// datanode command for containerOne/datanodeTwo
// The sequence id of the container should have been updated.
Assert.assertEquals(999999L, containerOne.getSequenceId());
// Now datanodeTwo should close containerOne.
final ContainerReportsProto containerReportTwo = getContainerReportsProto(
containerOne.containerID(), ContainerReplicaProto.State.CLOSED,
datanodeTwo.getUuidString(), 999999L);
final ContainerReportFromDatanode containerReportFromDatanodeTwo =
new ContainerReportFromDatanode(datanodeTwo, containerReportTwo);
reportHandler.onMessage(containerReportFromDatanodeTwo, publisher);
// The container should be closed in SCM now.
Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState());
}
@Test
public void testQuasiClosedWithSameOriginNodeReplica()
throws NodeNotFoundException, IOException {
/*
* The container is in QUASI_CLOSED state.
* - One of the replica is in QUASI_CLOSED state
* - The other two replica are in OPEN/CLOSING state
*
* The datanode reports a QUASI_CLOSED replica which has the same
* origin node id as the existing QUASI_CLOSED replica.
*
* In this case SCM should not CLOSE the container.
*/
final NodeManager nodeManager = new MockNodeManager(true, 10);
final ContainerManager containerManager = Mockito.mock(
ContainerManager.class);
final ReplicationActivityStatus replicationActivityStatus =
new ReplicationActivityStatus(scheduler);
replicationActivityStatus.enableReplication();
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
final ContainerReportHandler reportHandler = new ContainerReportHandler(
nodeManager, pipelineManager, containerManager,
replicationActivityStatus);
final Iterator<DatanodeDetails> nodeIterator = nodeManager.getNodes(
NodeState.HEALTHY).iterator();
final DatanodeDetails datanodeOne = nodeIterator.next();
final DatanodeDetails datanodeTwo = nodeIterator.next();
final DatanodeDetails datanodeThree = nodeIterator.next();
final ContainerInfo containerOne =
getContainer(LifeCycleState.QUASI_CLOSED);
final ContainerInfo containerTwo =
getContainer(LifeCycleState.CLOSED);
final Set<ContainerID> containerIDSet = Stream.of(
containerOne.containerID(), containerTwo.containerID())
.collect(Collectors.toSet());
final Set<ContainerReplica> containerOneReplicas = getReplicas(
containerOne.containerID(),
ContainerReplicaProto.State.QUASI_CLOSED,
datanodeOne);
containerOneReplicas.addAll(getReplicas(
containerOne.containerID(),
ContainerReplicaProto.State.CLOSING,
datanodeTwo));
final Set<ContainerReplica> containerTwoReplicas = getReplicas(
containerTwo.containerID(),
ContainerReplicaProto.State.CLOSED,
datanodeOne, datanodeTwo, datanodeThree);
nodeManager.setContainers(datanodeOne, containerIDSet);
nodeManager.setContainers(datanodeTwo, containerIDSet);
nodeManager.setContainers(datanodeThree,
Collections.singleton(containerTwo.containerID()));
addContainerToContainerManager(
containerManager, containerOne, containerOneReplicas);
addContainerToContainerManager(
containerManager, containerTwo, containerTwoReplicas);
mockUpdateContainerReplica(
containerManager, containerOne, containerOneReplicas);
mockUpdateContainerState(containerManager, containerOne,
LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED);
// containerOne is QUASI_CLOSED in datanodeOne and CLOSING in datanodeTwo.
// Now datanodeThree is sending container report which says that it has
// containerOne replica, but the originNodeId of this replica is
// datanodeOne. In this case we should not force close the container even
// though we got two QUASI_CLOSED replicas.
final ContainerReportsProto containerReport = getContainerReportsProto(
containerOne.containerID(), ContainerReplicaProto.State.QUASI_CLOSED,
datanodeOne.getUuidString());
final EventPublisher publisher = Mockito.mock(EventPublisher.class);
final ContainerReportFromDatanode containerReportFromDatanode =
new ContainerReportFromDatanode(datanodeThree, containerReport);
new ContainerReportFromDatanode(datanodeOne, containerReport);
reportHandler.onMessage(containerReportFromDatanode, publisher);
Mockito.verify(publisher, Mockito.times(0))
.fireEvent(Mockito.any(), Mockito.any());
Assert.assertEquals(LifeCycleState.CLOSED, containerOne.getState());
}
private static ContainerReportsProto getContainerReportsProto(
final ContainerID containerId, final ContainerReplicaProto.State state,
final String originNodeId) {
return getContainerReportsProto(containerId, state, originNodeId, 100L);
}
private static ContainerReportsProto getContainerReportsProto(
final ContainerID containerId, final ContainerReplicaProto.State state,
final String originNodeId, final long bcsid) {
final ContainerReportsProto.Builder crBuilder =
ContainerReportsProto.newBuilder();
final ContainerReplicaProto replicaProto =
@ -674,7 +501,7 @@ private static ContainerReportsProto getContainerReportsProto(
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
.setWriteBytes(2000000000L)
.setBlockCommitSequenceId(bcsid)
.setBlockCommitSequenceId(10000L)
.setDeleteTransactionId(0)
.build();
return crBuilder.addReports(replicaProto).build();

View File

@ -1,73 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.Set;
/**
* Helper methods for testing ContainerReportHandler and
* IncrementalContainerReportHandler.
*/
public final class TestContainerReportHelper {
private TestContainerReportHelper() {}
static void addContainerToContainerManager(
final ContainerManager containerManager, final ContainerInfo container,
final Set<ContainerReplica> replicas) throws ContainerNotFoundException {
Mockito.when(containerManager.getContainer(container.containerID()))
.thenReturn(container);
Mockito.when(
containerManager.getContainerReplicas(container.containerID()))
.thenReturn(replicas);
}
static void mockUpdateContainerReplica(
final ContainerManager containerManager,
final ContainerInfo containerInfo, final Set<ContainerReplica> replicas)
throws ContainerNotFoundException {
Mockito.doAnswer((Answer<Void>) invocation -> {
Object[] args = invocation.getArguments();
if (args[0].equals(containerInfo.containerID())) {
ContainerReplica replica = (ContainerReplica) args[1];
replicas.remove(replica);
replicas.add(replica);
}
return null;
}).when(containerManager).updateContainerReplica(
Mockito.any(ContainerID.class), Mockito.any(ContainerReplica.class));
}
static void mockUpdateContainerState(
final ContainerManager containerManager,
final ContainerInfo containerInfo,
final LifeCycleEvent event, final LifeCycleState state)
throws IOException {
Mockito.doAnswer((Answer<LifeCycleState>) invocation -> {
containerInfo.setState(state);
return containerInfo.getState();
}).when(containerManager).updateContainerState(
containerInfo.containerID(), event);
}
}

View File

@ -17,47 +17,78 @@
*/
package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.IncrementalContainerReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.Set;
import static org.apache.hadoop.hdds.scm.container
.TestContainerReportHelper.addContainerToContainerManager;
import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
import static org.apache.hadoop.hdds.scm.container
.TestContainerReportHelper.mockUpdateContainerReplica;
import static org.apache.hadoop.hdds.scm.container
.TestContainerReportHelper.mockUpdateContainerState;
/**
* Test cases to verify the functionality of IncrementalContainerReportHandler.
*/
public class TestIncrementalContainerReportHandler {
private ContainerManager containerManager;
private ContainerStateManager containerStateManager;
private EventPublisher publisher;
@Before
public void setup() throws IOException {
final Configuration conf = new OzoneConfiguration();
this.containerManager = Mockito.mock(ContainerManager.class);
this.containerStateManager = new ContainerStateManager(conf);
this.publisher = Mockito.mock(EventPublisher.class);
Mockito.when(containerManager.getContainer(Mockito.any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
.getContainer((ContainerID)invocation.getArguments()[0]));
Mockito.when(containerManager.getContainerReplicas(
Mockito.any(ContainerID.class)))
.thenAnswer(invocation -> containerStateManager
.getContainerReplicas((ContainerID)invocation.getArguments()[0]));
Mockito.doAnswer(invocation -> {
containerStateManager
.updateContainerState((ContainerID)invocation.getArguments()[0],
(HddsProtos.LifeCycleEvent)invocation.getArguments()[1]);
return null;
}).when(containerManager).updateContainerState(
Mockito.any(ContainerID.class),
Mockito.any(HddsProtos.LifeCycleEvent.class));
}
@After
public void tearDown() throws IOException {
containerStateManager.close();
}
@Test
public void testClosingToClosed() throws IOException {
final ContainerManager containerManager = Mockito.mock(
ContainerManager.class);
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
final IncrementalContainerReportHandler reportHandler =
new IncrementalContainerReportHandler(
pipelineManager, containerManager);
new IncrementalContainerReportHandler(containerManager);
final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
@ -67,34 +98,31 @@ public void testClosingToClosed() throws IOException {
ContainerReplicaProto.State.CLOSING,
datanodeOne, datanodeTwo, datanodeThree);
addContainerToContainerManager(
containerManager, container, containerReplicas);
mockUpdateContainerReplica(
containerManager, container, containerReplicas);
mockUpdateContainerState(containerManager, container,
LifeCycleEvent.CLOSE, LifeCycleState.CLOSED);
containerStateManager.loadContainer(container);
containerReplicas.forEach(r -> {
try {
containerStateManager.updateContainerReplica(
container.containerID(), r);
} catch (ContainerNotFoundException ignored) {
}
});
final IncrementalContainerReportProto containerReport =
getIncrementalContainerReportProto(container.containerID(),
ContainerReplicaProto.State.CLOSED,
datanodeOne.getUuidString());
final EventPublisher publisher = Mockito.mock(EventPublisher.class);
final IncrementalContainerReportFromDatanode icrFromDatanode =
new IncrementalContainerReportFromDatanode(
datanodeOne, containerReport);
reportHandler.onMessage(icrFromDatanode, publisher);
Assert.assertEquals(
LifeCycleState.CLOSED, container.getState());
Assert.assertEquals(LifeCycleState.CLOSED, container.getState());
}
@Test
public void testClosingToQuasiClosed() throws IOException {
final ContainerManager containerManager = Mockito.mock(
ContainerManager.class);
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
final IncrementalContainerReportHandler reportHandler =
new IncrementalContainerReportHandler(
pipelineManager, containerManager);
new IncrementalContainerReportHandler(containerManager);
final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
@ -104,34 +132,32 @@ public void testClosingToQuasiClosed() throws IOException {
ContainerReplicaProto.State.CLOSING,
datanodeOne, datanodeTwo, datanodeThree);
addContainerToContainerManager(
containerManager, container, containerReplicas);
mockUpdateContainerReplica(
containerManager, container, containerReplicas);
mockUpdateContainerState(containerManager, container,
LifeCycleEvent.QUASI_CLOSE, LifeCycleState.QUASI_CLOSED);
containerStateManager.loadContainer(container);
containerReplicas.forEach(r -> {
try {
containerStateManager.updateContainerReplica(
container.containerID(), r);
} catch (ContainerNotFoundException ignored) {
}
});
final IncrementalContainerReportProto containerReport =
getIncrementalContainerReportProto(container.containerID(),
ContainerReplicaProto.State.QUASI_CLOSED,
datanodeOne.getUuidString());
final EventPublisher publisher = Mockito.mock(EventPublisher.class);
final IncrementalContainerReportFromDatanode icrFromDatanode =
new IncrementalContainerReportFromDatanode(
datanodeOne, containerReport);
reportHandler.onMessage(icrFromDatanode, publisher);
Assert.assertEquals(
LifeCycleState.QUASI_CLOSED, container.getState());
Assert.assertEquals(LifeCycleState.QUASI_CLOSED, container.getState());
}
@Test
public void testQuasiClosedToClosed() throws IOException {
final ContainerManager containerManager = Mockito.mock(
ContainerManager.class);
final PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
final IncrementalContainerReportHandler reportHandler =
new IncrementalContainerReportHandler(
pipelineManager, containerManager);
new IncrementalContainerReportHandler(containerManager);
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
@ -145,38 +171,25 @@ public void testQuasiClosedToClosed() throws IOException {
ContainerReplicaProto.State.QUASI_CLOSED,
datanodeThree));
containerStateManager.loadContainer(container);
containerReplicas.forEach(r -> {
try {
containerStateManager.updateContainerReplica(
container.containerID(), r);
} catch (ContainerNotFoundException ignored) {
addContainerToContainerManager(
containerManager, container, containerReplicas);
mockUpdateContainerReplica(
containerManager, container, containerReplicas);
mockUpdateContainerState(containerManager, container,
LifeCycleEvent.FORCE_CLOSE, LifeCycleState.CLOSED);
}
});
final IncrementalContainerReportProto containerReport =
getIncrementalContainerReportProto(container.containerID(),
ContainerReplicaProto.State.QUASI_CLOSED,
datanodeOne.getUuidString(), 999999L);
final EventPublisher publisher = Mockito.mock(EventPublisher.class);
final IncrementalContainerReportFromDatanode icrFromDatanode =
ContainerReplicaProto.State.CLOSED,
datanodeThree.getUuidString());
final IncrementalContainerReportFromDatanode icr =
new IncrementalContainerReportFromDatanode(
datanodeOne, containerReport);
reportHandler.onMessage(icrFromDatanode, publisher);
// SCM should issue force close.
Mockito.verify(publisher, Mockito.times(1))
.fireEvent(Mockito.any(), Mockito.any());
final IncrementalContainerReportProto containerReportTwo =
getIncrementalContainerReportProto(container.containerID(),
ContainerReplicaProto.State.CLOSED,
datanodeOne.getUuidString(), 999999L);
final IncrementalContainerReportFromDatanode icrTwoFromDatanode =
new IncrementalContainerReportFromDatanode(
datanodeOne, containerReportTwo);
reportHandler.onMessage(icrTwoFromDatanode, publisher);
Assert.assertEquals(
LifeCycleState.CLOSED, container.getState());
reportHandler.onMessage(icr, publisher);
Assert.assertEquals(LifeCycleState.CLOSED, container.getState());
}
private static IncrementalContainerReportProto
@ -184,15 +197,6 @@ public void testQuasiClosedToClosed() throws IOException {
final ContainerID containerId,
final ContainerReplicaProto.State state,
final String originNodeId) {
return getIncrementalContainerReportProto(
containerId, state, originNodeId, 100L);
}
private static IncrementalContainerReportProto
getIncrementalContainerReportProto(
final ContainerID containerId,
final ContainerReplicaProto.State state,
final String originNodeId, final long bcsid) {
final IncrementalContainerReportProto.Builder crBuilder =
IncrementalContainerReportProto.newBuilder();
final ContainerReplicaProto replicaProto =
@ -208,7 +212,7 @@ public void testQuasiClosedToClosed() throws IOException {
.setWriteCount(100000000L)
.setReadBytes(2000000000L)
.setWriteBytes(2000000000L)
.setBlockCommitSequenceId(bcsid)
.setBlockCommitSequenceId(10000L)
.setDeleteTransactionId(0)
.build();
return crBuilder.addReport(replicaProto).build();

View File

@ -1,85 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.container.replication;
import static org.junit.Assert.*;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
import org.apache.hadoop.hdds.scm.chillmode.ChillModeHandler;
import org.apache.hadoop.hdds.scm.chillmode.SCMChillModeManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMClientProtocolServer;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.utils.Scheduler;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Tests for ReplicationActivityStatus.
*/
public class TestReplicationActivityStatus {
private static EventQueue eventQueue;
private static ReplicationActivityStatus replicationActivityStatus;
@BeforeClass
public static void setup() {
eventQueue = new EventQueue();
replicationActivityStatus = new ReplicationActivityStatus(
new Scheduler("SCMCommonScheduler", false, 1));
OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
ozoneConfiguration.set(HddsConfigKeys.
HDDS_SCM_WAIT_TIME_AFTER_CHILL_MODE_EXIT, "3s");
SCMClientProtocolServer scmClientProtocolServer =
Mockito.mock(SCMClientProtocolServer.class);
BlockManager blockManager = Mockito.mock(BlockManagerImpl.class);
ChillModeHandler chillModeHandler =
new ChillModeHandler(ozoneConfiguration, scmClientProtocolServer,
blockManager, replicationActivityStatus);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler);
}
@Test
public void testReplicationStatusForChillMode()
throws TimeoutException, InterruptedException {
assertFalse(replicationActivityStatus.isReplicationEnabled());
// In chill mode replication process should be stopped.
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS,
new SCMChillModeManager.ChillModeStatus(true));
assertFalse(replicationActivityStatus.isReplicationEnabled());
// Replication should be enabled when chill mode if off.
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS,
new SCMChillModeManager.ChillModeStatus(false));
GenericTestUtils.waitFor(() -> {
return replicationActivityStatus.isReplicationEnabled();
}, 10, 1000*5);
assertTrue(replicationActivityStatus.isReplicationEnabled());
}
}

View File

@ -24,7 +24,7 @@
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
import org.apache.hadoop.hdds.scm.chillmode.ChillModeHandler;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationActivityStatus;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.server.events.EventQueue;
@ -48,10 +48,10 @@ public void setUp() throws Exception {
eventQueue = new EventQueue();
scmClientProtocolServer = new SCMClientProtocolServer(config, null);
BlockManager blockManager = Mockito.mock(BlockManagerImpl.class);
ReplicationActivityStatus replicationActivityStatus =
Mockito.mock(ReplicationActivityStatus.class);
ReplicationManager replicationManager =
Mockito.mock(ReplicationManager.class);
ChillModeHandler chillModeHandler = new ChillModeHandler(config,
scmClientProtocolServer, blockManager, replicationActivityStatus);
scmClientProtocolServer, blockManager, replicationManager);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, chillModeHandler);
}