diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java index 318a94b63f..ea73a28599 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java @@ -242,4 +242,10 @@ static RetryPolicy createRetryPolicy(Configuration conf) { .retryUpToMaximumCountWithFixedSleep(maxRetryCount, sleepDuration); return retryPolicy; } + + static Long getMinReplicatedIndex( + Collection commitInfos) { + return commitInfos.stream().map(RaftProtos.CommitInfoProto::getCommitIndex) + .min(Long::compareTo).orElse(null); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java index ec70dbd96e..e6858cd039 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerData.java @@ -544,4 +544,9 @@ public void computeAndSetChecksum(Yaml yaml) throws IOException { * @return Protocol Buffer Message */ public abstract ContainerProtos.ContainerDataProto getProtoBufMessage(); + + /** + * Returns the blockCommitSequenceId. + */ + public abstract long getBlockCommitSequenceId(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java index 83f59ae2e8..266371dd8e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java @@ -27,16 +27,15 @@ import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.hadoop.ozone.container.common - .interfaces.ContainerDeletionChoosingPolicy; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Iterator; -import java.util.List; import java.util.Set; +import java.util.List; +import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -165,6 +164,10 @@ public Map getContainerMapCopy() { return ImmutableMap.copyOf(containerMap); } + public Map getContainerMap() { + return Collections.unmodifiableMap(containerMap); + } + /** * A simple interface for container Iterations. *

@@ -232,18 +235,6 @@ public ContainerReportsProto getContainerReport() throws IOException { return crBuilder.build(); } - public List chooseContainerForBlockDeletion(int count, - ContainerDeletionChoosingPolicy deletionPolicy) - throws StorageContainerException { - Map containerDataMap = containerMap.entrySet().stream() - .filter(e -> deletionPolicy.isValidContainerType( - e.getValue().getContainerType())) - .collect(Collectors.toMap(Map.Entry::getKey, - e -> e.getValue().getContainerData())); - return deletionPolicy - .chooseContainerForBlockDeletion(count, containerDataMap); - } - public Set getMissingContainerSet() { return missingContainerSet; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java index 5163d9851a..af854ec3d6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type; @@ -81,6 +82,11 @@ public CommandHandler getCloseContainerHandler() { return handlerMap.get(Type.closeContainerCommand); } + @VisibleForTesting + public CommandHandler getDeleteBlocksCommandHandler() { + return handlerMap.get(Type.deleteBlocksCommand); + } + /** * Dispatch the command to the correct handler. * diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java index ccf57cb2f3..104a43368a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java @@ -171,6 +171,11 @@ public long getNumReadStateMachineMissCount() { return numReadStateMachineMissCount.value(); } + @VisibleForTesting + public long getNumReadStateMachineOps() { + return numReadStateMachineOps.value(); + } + @VisibleForTesting public long getNumBytesWrittenCount() { return numBytesWrittenCount.value(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index aadec8dcd7..4f876bc020 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -518,7 +519,8 @@ public CompletableFuture query(Message request) { } private ByteString readStateMachineData( - ContainerCommandRequestProto requestProto, long term, long index) { + ContainerCommandRequestProto requestProto, long term, long index) + throws IOException { // the stateMachine data is not present in the stateMachine cache, // increment the stateMachine cache miss count metrics.incNumReadStateMachineMissCount(); @@ -532,18 +534,24 @@ private ByteString readStateMachineData( .setChunkData(chunkInfo); ContainerCommandRequestProto dataContainerCommandProto = ContainerCommandRequestProto.newBuilder(requestProto) - .setCmdType(Type.ReadChunk) - .setReadChunk(readChunkRequestProto) + .setCmdType(Type.ReadChunk).setReadChunk(readChunkRequestProto) .build(); DispatcherContext context = - new DispatcherContext.Builder() - .setTerm(term) - .setLogIndex(index) - .setReadFromTmpFile(true) - .build(); + new DispatcherContext.Builder().setTerm(term).setLogIndex(index) + .setReadFromTmpFile(true).build(); // read the chunk ContainerCommandResponseProto response = dispatchCommand(dataContainerCommandProto, context); + if (response.getResult() != ContainerProtos.Result.SUCCESS) { + StorageContainerException sce = + new StorageContainerException(response.getMessage(), + response.getResult()); + LOG.error("gid {} : ReadStateMachine failed. cmd {} logIndex {} msg : " + + "{} Container Result: {}", gid, response.getCmdType(), index, + response.getMessage(), response.getResult()); + throw sce; + } + ReadChunkResponseProto responseProto = response.getReadChunk(); ByteString data = responseProto.getData(); @@ -746,7 +754,8 @@ private static CompletableFuture completeExceptionally(Exception e) { return future; } - private void evictStateMachineCache() { + @VisibleForTesting + public void evictStateMachineCache() { stateMachineDataCache.invalidateAll(); stateMachineDataCache.cleanUp(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index b4021cf657..a5cbbff3cb 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -22,15 +22,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineReport; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; -import org.apache.hadoop.hdds.protocol.proto - .StorageContainerDatanodeProtocolProtos.PipelineAction; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction; import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.security.x509.SecurityConfig; @@ -50,14 +46,7 @@ import org.apache.ratis.grpc.GrpcFactory; import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.netty.NettyConfigKeys; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientReply; -import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.NotLeaderException; -import org.apache.ratis.protocol.StateMachineException; -import org.apache.ratis.protocol.RaftPeerId; -import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.*; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; @@ -74,11 +63,11 @@ import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Collections; import java.util.UUID; +import java.util.ArrayList; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -139,10 +128,10 @@ private XceiverServerRatis(DatanodeDetails dd, int port, TimeUnit.MILLISECONDS); this.dispatcher = dispatcher; - RaftServer.Builder builder = RaftServer.newBuilder() - .setServerId(RatisHelper.toRaftPeerId(dd)) - .setProperties(serverProperties) - .setStateMachineRegistry(this::getStateMachine); + RaftServer.Builder builder = + RaftServer.newBuilder().setServerId(RatisHelper.toRaftPeerId(dd)) + .setProperties(serverProperties) + .setStateMachineRegistry(this::getStateMachine); if (tlsConfig != null) { builder.setParameters(GrpcFactory.newRaftParameters(tlsConfig)); } @@ -507,6 +496,13 @@ private RaftClientRequest createRaftClientRequest( null); } + private GroupInfoRequest createGroupInfoRequest( + HddsProtos.PipelineID pipelineID) { + return new GroupInfoRequest(clientId, server.getId(), + RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineID).getId()), + nextCallId()); + } + private void handlePipelineFailure(RaftGroupId groupId, RoleInfoProto roleInfoProto) { String msg; @@ -654,4 +650,12 @@ public void handleNodeLogFailure(RaftGroupId groupId, Throwable t) { triggerPipelineClose(groupId, msg, ClosePipelineInfo.Reason.PIPELINE_LOG_FAILED, true); } + + public long getMinReplicatedIndex(PipelineID pipelineID) throws IOException { + Long minIndex; + GroupInfoReply reply = getServer() + .getGroupInfo(createGroupInfoRequest(pipelineID.getProtobuf())); + minIndex = RatisHelper.getMinReplicatedIndex(reply.getCommitInfos()); + return minIndex == null ? -1 : minIndex.longValue(); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 10f94dc6ea..a65b5be38d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -25,7 +25,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; @@ -76,8 +75,6 @@ import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl; import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager; import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager; -import org.apache.hadoop.ozone.container.keyvalue.statemachine.background - .BlockDeletingService; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.ReflectionUtils; @@ -86,15 +83,8 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import static org.apache.hadoop.hdds.HddsConfigKeys .HDDS_DATANODE_VOLUME_CHOOSING_POLICY; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_SERVICE_INTERVAL; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos. + Result.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -109,7 +99,6 @@ public class KeyValueHandler extends Handler { private final ContainerType containerType; private final BlockManager blockManager; private final ChunkManager chunkManager; - private final BlockDeletingService blockDeletingService; private final VolumeChoosingPolicy volumeChoosingPolicy; private final long maxContainerSize; @@ -126,18 +115,6 @@ public KeyValueHandler(Configuration config, StateContext context, conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_CHUNK_WRITE_SYNC_KEY, OzoneConfigKeys.DFS_CONTAINER_CHUNK_WRITE_SYNC_DEFAULT); chunkManager = new ChunkManagerImpl(doSyncWrite); - long svcInterval = config - .getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, - OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS); - long serviceTimeout = config - .getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, - OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - this.blockDeletingService = - new BlockDeletingService(containerSet, svcInterval, serviceTimeout, - TimeUnit.MILLISECONDS, config); - blockDeletingService.start(); volumeChoosingPolicy = ReflectionUtils.newInstance(conf.getClass( HDDS_DATANODE_VOLUME_CHOOSING_POLICY, RoundRobinVolumeChoosingPolicy .class, VolumeChoosingPolicy.class), conf); @@ -160,7 +137,6 @@ public VolumeChoosingPolicy getVolumeChoosingPolicyForTesting() { @Override public void stop() { - blockDeletingService.shutdown(); } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java index e4814cb26f..1b7029882f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java @@ -207,7 +207,8 @@ public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info, // In case the chunk file does not exist but tmp chunk file exist, // read from tmp chunk file if readFromTmpFile is set to true - if (!chunkFile.exists() && dispatcherContext.isReadFromTmpFile()) { + if (!chunkFile.exists() && dispatcherContext != null + && dispatcherContext.isReadFromTmpFile()) { chunkFile = getTmpChunkFile(chunkFile, dispatcherContext); } data = ChunkUtils.readData(chunkFile, info, volumeIOStats); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java index 4e02892dd0..25c00c39d5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java @@ -20,12 +20,14 @@ import com.google.common.collect.Lists; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.ozone.container.common.impl.ContainerData; -import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicy; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.util.ReflectionUtils; import org.apache.ratis.thirdparty.com.google.protobuf .InvalidProtocolBufferException; @@ -48,10 +50,13 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.apache.hadoop.ozone.OzoneConfigKeys .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL; @@ -67,12 +72,12 @@ * of deleting staled ozone blocks. */ // TODO: Fix BlockDeletingService to work with new StorageLayer -public class BlockDeletingService extends BackgroundService{ +public class BlockDeletingService extends BackgroundService { private static final Logger LOG = LoggerFactory.getLogger(BlockDeletingService.class); - private ContainerSet containerSet; + private OzoneContainer ozoneContainer; private ContainerDeletionChoosingPolicy containerDeletionPolicy; private final Configuration conf; @@ -88,22 +93,23 @@ public class BlockDeletingService extends BackgroundService{ // Core pool size for container tasks private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 10; - public BlockDeletingService(ContainerSet containerSet, long serviceInterval, - long serviceTimeout, TimeUnit timeUnit, Configuration conf) { + public BlockDeletingService(OzoneContainer ozoneContainer, + long serviceInterval, long serviceTimeout, TimeUnit timeUnit, + Configuration conf) { super("BlockDeletingService", serviceInterval, timeUnit, BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout); - this.containerSet = containerSet; + this.ozoneContainer = ozoneContainer; containerDeletionPolicy = ReflectionUtils.newInstance(conf.getClass( ScmConfigKeys.OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY, TopNOrderedContainerDeletionChoosingPolicy.class, ContainerDeletionChoosingPolicy.class), conf); this.conf = conf; - this.blockLimitPerTask = conf.getInt( - OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, - OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT); - this.containerLimitPerInterval = conf.getInt( - OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, - OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT); + this.blockLimitPerTask = + conf.getInt(OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER, + OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER_DEFAULT); + this.containerLimitPerInterval = + conf.getInt(OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL, + OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT); } @@ -117,8 +123,8 @@ public BackgroundTaskQueue getTasks() { // We must ensure there is no empty container in this result. // The chosen result depends on what container deletion policy is // configured. - containers = containerSet.chooseContainerForBlockDeletion( - containerLimitPerInterval, containerDeletionPolicy); + containers = chooseContainerForBlockDeletion(containerLimitPerInterval, + containerDeletionPolicy); if (containers.size() > 0) { LOG.info("Plan to choose {} containers for block deletion, " + "actually returns {} valid containers.", @@ -143,6 +149,64 @@ public BackgroundTaskQueue getTasks() { return queue; } + public List chooseContainerForBlockDeletion(int count, + ContainerDeletionChoosingPolicy deletionPolicy) + throws StorageContainerException { + Map containerDataMap = + ozoneContainer.getContainerSet().getContainerMap().entrySet().stream() + .filter(e -> isDeletionAllowed(e.getValue().getContainerData(), + deletionPolicy)).collect(Collectors + .toMap(Map.Entry::getKey, e -> e.getValue().getContainerData())); + return deletionPolicy + .chooseContainerForBlockDeletion(count, containerDataMap); + } + + private boolean isDeletionAllowed(ContainerData containerData, + ContainerDeletionChoosingPolicy deletionPolicy) { + if (!deletionPolicy + .isValidContainerType(containerData.getContainerType())) { + return false; + } else if (!containerData.isClosed()) { + return false; + } else { + if (ozoneContainer.getWriteChannel() instanceof XceiverServerRatis) { + XceiverServerRatis ratisServer = + (XceiverServerRatis) ozoneContainer.getWriteChannel(); + PipelineID pipelineID = PipelineID + .valueOf(UUID.fromString(containerData.getOriginPipelineId())); + // in case te ratis group does not exist, just mark it for deletion. + if (!ratisServer.isExist(pipelineID.getProtobuf())) { + return true; + } + try { + long minReplicatedIndex = + ratisServer.getMinReplicatedIndex(pipelineID); + long containerBCSID = containerData.getBlockCommitSequenceId(); + if (minReplicatedIndex >= 0 && minReplicatedIndex < containerBCSID) { + LOG.warn("Close Container log Index {} is not replicated across all" + + "the servers in the pipeline {} as the min replicated " + + "index is {}. Deletion is not allowed in this container " + + "yet.", containerBCSID, + containerData.getOriginPipelineId(), minReplicatedIndex); + return false; + } else { + return true; + } + } catch (IOException ioe) { + // in case of any exception check again whether the pipeline exist + // and in case the pipeline got destroyed, just mark it for deletion + if (!ratisServer.isExist(pipelineID.getProtobuf())) { + return true; + } else { + LOG.info(ioe.getMessage()); + return false; + } + } + } + return true; + } + } + private static class ContainerBackgroundTaskResult implements BackgroundTaskResult { private List deletedBlockIds; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index e2965501d2..d6e4588a8a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -42,6 +42,7 @@ import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; +import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService; import org.apache.hadoop.ozone.container.replication.GrpcReplicationService; import org.apache.hadoop.ozone.container.replication .OnDemandContainerReplicationSource; @@ -53,6 +54,9 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.ozone.OzoneConfigKeys.*; /** * Ozone main class sets up the network servers and initializes the container @@ -72,6 +76,7 @@ public class OzoneContainer { private final XceiverServerSpi readChannel; private final ContainerController controller; private ContainerScrubber scrubber; + private final BlockDeletingService blockDeletingService; /** * Construct OzoneContainer object. @@ -111,7 +116,17 @@ public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration this.readChannel = new XceiverServerGrpc( datanodeDetails, config, hddsDispatcher, certClient, createReplicationService()); - + long svcInterval = config + .getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, + OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + long serviceTimeout = config + .getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_TIMEOUT, + OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + this.blockDeletingService = + new BlockDeletingService(this, svcInterval, serviceTimeout, + TimeUnit.MILLISECONDS, config); } private GrpcReplicationService createReplicationService() { @@ -189,6 +204,7 @@ public void start(String scmId) throws IOException { readChannel.start(); hddsDispatcher.init(); hddsDispatcher.setScmId(scmId); + blockDeletingService.start(); } /** @@ -203,6 +219,7 @@ public void stop() { this.handlers.values().forEach(Handler::stop); hddsDispatcher.shutdown(); volumeSet.shutdown(); + blockDeletingService.shutdown(); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java index 115b5e2cf8..a136983415 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/testutils/BlockDeletingServiceTestImpl.java @@ -19,9 +19,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.keyvalue.statemachine.background .BlockDeletingService; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @@ -42,9 +42,9 @@ public class BlockDeletingServiceTestImpl private Thread testingThread; private AtomicInteger numOfProcessed = new AtomicInteger(0); - public BlockDeletingServiceTestImpl(ContainerSet containerSet, + public BlockDeletingServiceTestImpl(OzoneContainer container, int serviceInterval, Configuration conf) { - super(containerSet, serviceInterval, SERVICE_TIMEOUT_IN_MILLISECONDS, + super(container, serviceInterval, SERVICE_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS, conf); } @@ -67,7 +67,8 @@ public int getTimesOfProcessed() { } // Override the implementation to start a single on-call control thread. - @Override public void start() { + @Override + public void start() { PeriodicalTask svc = new PeriodicalTask(); // In test mode, relies on a latch countdown to runDeletingTasks tasks. Runnable r = () -> { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java new file mode 100644 index 0000000000..30c2624fbf --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.ozone.client.rpc; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.XceiverClientManager; +import org.apache.hadoop.hdds.scm.XceiverClientSpi; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.pipeline.Pipeline; +import org.apache.hadoop.ozone.HddsDatanodeService; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.KeyOutputStream; +import org.apache.hadoop.ozone.client.io.OzoneOutputStream; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; + +/** + * Tests delete key operation with a slow follower in the datanode + * pipeline. + */ +public class TestDeleteWithSlowFollower { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + private static OzoneClient client; + private static ObjectStore objectStore; + private static String volumeName; + private static String bucketName; + private static String path; + private static XceiverClientManager xceiverClientManager; + + /** + * Create a MiniDFSCluster for testing. + * + * @throws IOException + */ + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + path = GenericTestUtils + .getTempPath(TestContainerStateMachineFailures.class.getSimpleName()); + File baseDir = new File(path); + baseDir.mkdirs(); + + conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200, + TimeUnit.MILLISECONDS); + // Make the stale, dead and server failure timeout higher so that a dead + // node is not detecte at SCM as well as the pipeline close action + // never gets initiated early at Datanode in the test. + conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200, + TimeUnit.MILLISECONDS); + conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 1000, TimeUnit.SECONDS); + conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL, 2000, + TimeUnit.SECONDS); + conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 1000, + TimeUnit.SECONDS); + conf.setTimeDuration(OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY, + 1000, TimeUnit.SECONDS); + conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL, + 1, TimeUnit.SECONDS); + + conf.setQuietMode(false); + cluster = + MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(100) + .build(); + cluster.waitForClusterToBeReady(); + //the easiest way to create an open container is creating a key + client = OzoneClientFactory.getClient(conf); + objectStore = client.getObjectStore(); + xceiverClientManager = new XceiverClientManager(conf); + volumeName = "testcontainerstatemachinefailures"; + bucketName = volumeName; + objectStore.createVolume(volumeName); + objectStore.getVolume(volumeName).createBucket(bucketName); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * The test simulates a slow follower by first writing key thereby creating a + * a container on 3 dns of the cluster. Then, a dn is shutdown and a close + * container cmd gets issued so that in the leader and the alive follower, + * container gets closed. And then, key is deleted and + * the node is started up again so that it + * rejoins the ring and starts applying the transaction from where it left + * by fetching the entries from the leader. Until and unless this follower + * catches up and its replica gets closed, + * the data is not deleted from any of the nodes which have the + * closed replica. + */ + @Test + public void testDeleteKeyWithSlowFollower() throws Exception { + + OzoneOutputStream key = + objectStore.getVolume(volumeName).getBucket(bucketName) + .createKey("ratis", 0, ReplicationType.RATIS, + ReplicationFactor.THREE, new HashMap<>()); + byte[] testData = "ratis".getBytes(); + // First write and flush creates a container in the datanode + key.write(testData); + key.flush(); + + KeyOutputStream groupOutputStream = (KeyOutputStream) key.getOutputStream(); + List locationInfoList = + groupOutputStream.getLocationInfoList(); + Assert.assertEquals(1, locationInfoList.size()); + OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0); + long containerID = omKeyLocationInfo.getContainerID(); + // A container is created on the datanode. Now figure out a follower node to + // kill/slow down. + HddsDatanodeService follower = null; + HddsDatanodeService leader = null; + + List pipelineList = + cluster.getStorageContainerManager().getPipelineManager() + .getPipelines(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + Assert.assertTrue(pipelineList.size() == 1); + Pipeline pipeline = pipelineList.get(0); + for (HddsDatanodeService dn : cluster.getHddsDatanodes()) { + if (ContainerTestHelper.isRatisFollower(dn, pipeline)) { + follower = dn; + } else if (ContainerTestHelper.isRatisLeader(dn, pipeline)) { + leader = dn; + } + } + Assert.assertNotNull(follower); + Assert.assertNotNull(leader); + // shutdown the slow follower + cluster.shutdownHddsDatanode(follower.getDatanodeDetails()); + key.write(testData); + key.close(); + + // now move the container to the closed on the datanode. + XceiverClientSpi xceiverClient = + xceiverClientManager.acquireClient(pipeline); + ContainerProtos.ContainerCommandRequestProto.Builder request = + ContainerProtos.ContainerCommandRequestProto.newBuilder(); + request.setDatanodeUuid(pipeline.getFirstNode().getUuidString()); + request.setCmdType(ContainerProtos.Type.CloseContainer); + request.setContainerID(containerID); + request.setCloseContainer( + ContainerProtos.CloseContainerRequestProto.getDefaultInstance()); + xceiverClient.sendCommand(request.build()); + + ContainerStateMachine stateMachine = + (ContainerStateMachine) ContainerTestHelper + .getStateMachine(leader, pipeline); + OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName). + setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS) + .setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName("ratis") + .build(); + OmKeyInfo info = cluster.getOzoneManager().lookupKey(keyArgs); + BlockID blockID = + info.getKeyLocationVersions().get(0).getLocationList().get(0) + .getBlockID(); + OzoneContainer ozoneContainer; + final DatanodeStateMachine dnStateMachine = + leader.getDatanodeStateMachine(); + ozoneContainer = dnStateMachine.getContainer(); + KeyValueHandler keyValueHandler = + (KeyValueHandler) ozoneContainer.getDispatcher() + .getHandler(ContainerProtos.ContainerType.KeyValueContainer); + Container container = + ozoneContainer.getContainerSet().getContainer(blockID.getContainerID()); + KeyValueContainerData containerData = + ((KeyValueContainerData) container.getContainerData()); + long delTrxId = containerData.getDeleteTransactionId(); + long numPendingDeletionBlocks = containerData.getNumPendingDeletionBlocks(); + BlockData blockData = + keyValueHandler.getBlockManager().getBlock(container, blockID); + cluster.getOzoneManager().deleteKey(keyArgs); + GenericTestUtils.waitFor(() -> { + return + dnStateMachine.getCommandDispatcher().getDeleteBlocksCommandHandler() + .getInvocationCount() >= 1; + }, 500, 100000); + Assert.assertTrue(containerData.getDeleteTransactionId() > delTrxId); + Assert.assertTrue( + containerData.getNumPendingDeletionBlocks() > numPendingDeletionBlocks); + // make sure the chunk was never deleted on the leader even though + // deleteBlock handler is invoked + try { + for (ContainerProtos.ChunkInfo chunkInfo : blockData.getChunks()) { + keyValueHandler.getChunkManager() + .readChunk(container, blockID, ChunkInfo.getFromProtoBuf(chunkInfo), + null); + } + } catch (IOException ioe) { + Assert.fail("Exception should not be thrown."); + + } + long numReadStateMachineOps = + stateMachine.getMetrics().getNumReadStateMachineOps(); + Assert.assertTrue( + stateMachine.getMetrics().getNumReadStateMachineFails() == 0); + stateMachine.evictStateMachineCache(); + cluster.restartHddsDatanode(follower.getDatanodeDetails(), false); + // wait for the raft server to come up and join the ratis ring + Thread.sleep(10000); + + // Make sure the readStateMachine call got triggered after the follower + // caught up + Assert.assertTrue(stateMachine.getMetrics().getNumReadStateMachineOps() + > numReadStateMachineOps); + Assert.assertTrue( + stateMachine.getMetrics().getNumReadStateMachineFails() == 0); + // wait for the chunk to get deleted now + Thread.sleep(10000); + for (HddsDatanodeService dn : cluster.getHddsDatanodes()) { + keyValueHandler = + (KeyValueHandler) dn.getDatanodeStateMachine().getContainer() + .getDispatcher() + .getHandler(ContainerProtos.ContainerType.KeyValueContainer); + // make sure the chunk is now deleted on the all dns + try { + for (ContainerProtos.ChunkInfo chunkInfo : blockData.getChunks()) { + keyValueHandler.getChunkManager().readChunk(container, blockID, + ChunkInfo.getFromProtoBuf(chunkInfo), null); + } + Assert.fail("Expected exception is not thrown"); + } catch (IOException ioe) { + Assert.assertTrue(ioe instanceof StorageContainerException); + Assert.assertTrue(((StorageContainerException) ioe).getResult() + == ContainerProtos.Result.UNABLE_TO_FIND_CHUNK); + } + } + + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 82d34d74e1..313a3c094f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.ratis.RatisHelper; import org.apache.hadoop.hdds.scm.container.ContainerID; import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; @@ -873,13 +874,33 @@ public static void waitForContainerClose(MiniOzoneCluster cluster, public static StateMachine getStateMachine(MiniOzoneCluster cluster) throws Exception { - XceiverServerSpi server = - cluster.getHddsDatanodes().get(0).getDatanodeStateMachine(). - getContainer().getWriteChannel(); + return getStateMachine(cluster.getHddsDatanodes().get(0), null); + } + + private static RaftServerImpl getRaftServerImpl(HddsDatanodeService dn, + Pipeline pipeline) throws Exception { + XceiverServerSpi server = dn.getDatanodeStateMachine(). + getContainer().getWriteChannel(); RaftServerProxy proxy = (RaftServerProxy) (((XceiverServerRatis) server).getServer()); - RaftGroupId groupId = proxy.getGroupIds().iterator().next(); - RaftServerImpl impl = proxy.getImpl(groupId); - return impl.getStateMachine(); + RaftGroupId groupId = + pipeline == null ? proxy.getGroupIds().iterator().next() : + RatisHelper.newRaftGroup(pipeline).getGroupId(); + return proxy.getImpl(groupId); + } + + public static StateMachine getStateMachine(HddsDatanodeService dn, + Pipeline pipeline) throws Exception { + return getRaftServerImpl(dn, pipeline).getStateMachine(); + } + + public static boolean isRatisLeader(HddsDatanodeService dn, Pipeline pipeline) + throws Exception { + return getRaftServerImpl(dn, pipeline).isLeader(); + } + + public static boolean isRatisFollower(HddsDatanodeService dn, + Pipeline pipeline) throws Exception { + return getRaftServerImpl(dn, pipeline).isFollower(); } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java index a85a117d52..e188effa13 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/TestBlockDeletingService.java @@ -36,6 +36,7 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.container.testutils.BlockDeletingServiceTestImpl; import org.apache.hadoop.ozone.container.common.impl.RandomContainerDeletionChoosingPolicy; import org.apache.hadoop.ozone.container.keyvalue.statemachine.background @@ -50,6 +51,7 @@ import org.junit.Assert; import org.junit.Test; import org.junit.BeforeClass; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,6 +115,7 @@ private void createToDeleteBlocks(ContainerSet containerSet, KeyValueContainerData data = new KeyValueContainerData(containerID, ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(), UUID.randomUUID().toString()); + data.closeContainer(); Container container = new KeyValueContainer(data, conf); container.create(new VolumeSet(scmId, clusterID, conf), new RoundRobinVolumeChoosingPolicy(), scmId); @@ -196,7 +199,7 @@ public void testBlockDeletion() throws Exception { createToDeleteBlocks(containerSet, conf, 1, 3, 1); BlockDeletingServiceTestImpl svc = - new BlockDeletingServiceTestImpl(containerSet, 1000, conf); + getBlockDeletinService(containerSet, conf, 1000); svc.start(); GenericTestUtils.waitFor(() -> svc.isStarted(), 100, 3000); @@ -256,7 +259,7 @@ public void testShutdownService() throws Exception { createToDeleteBlocks(containerSet, conf, 1, 100, 1); BlockDeletingServiceTestImpl service = - new BlockDeletingServiceTestImpl(containerSet, 1000, conf); + getBlockDeletinService(containerSet, conf, 1000); service.start(); GenericTestUtils.waitFor(() -> service.isStarted(), 100, 3000); @@ -285,7 +288,12 @@ public void testBlockDeletionTimeout() throws Exception { // set timeout value as 1ns to trigger timeout behavior long timeout = 1; - BlockDeletingService svc = new BlockDeletingService(containerSet, + OzoneContainer ozoneContainer = Mockito.mock(OzoneContainer.class); + Mockito.when(ozoneContainer.getContainerSet()) + .thenReturn(containerSet); + Mockito.when(ozoneContainer.getWriteChannel()) + .thenReturn(null); + BlockDeletingService svc = new BlockDeletingService(ozoneContainer, TimeUnit.MILLISECONDS.toNanos(1000), timeout, TimeUnit.NANOSECONDS, conf); svc.start(); @@ -307,7 +315,7 @@ public void testBlockDeletionTimeout() throws Exception { // test for normal case that doesn't have timeout limitation timeout = 0; createToDeleteBlocks(containerSet, conf, 1, 3, 1); - svc = new BlockDeletingService(containerSet, + svc = new BlockDeletingService(ozoneContainer, TimeUnit.MILLISECONDS.toNanos(1000), timeout, TimeUnit.MILLISECONDS, conf); svc.start(); @@ -338,6 +346,16 @@ public void testBlockDeletionTimeout() throws Exception { svc.shutdown(); } + private BlockDeletingServiceTestImpl getBlockDeletinService( + ContainerSet containerSet, Configuration conf, int timeout) { + OzoneContainer ozoneContainer = Mockito.mock(OzoneContainer.class); + Mockito.when(ozoneContainer.getContainerSet()).thenReturn(containerSet); + Mockito.when(ozoneContainer.getWriteChannel()).thenReturn(null); + BlockDeletingServiceTestImpl service = + new BlockDeletingServiceTestImpl(ozoneContainer, timeout, conf); + return service; + } + @Test(timeout = 30000) public void testContainerThrottle() throws Exception { // Properties : @@ -360,7 +378,7 @@ public void testContainerThrottle() throws Exception { createToDeleteBlocks(containerSet, conf, 2, 1, 10); BlockDeletingServiceTestImpl service = - new BlockDeletingServiceTestImpl(containerSet, 1000, conf); + getBlockDeletinService(containerSet, conf, 1000); service.start(); try { @@ -410,9 +428,11 @@ public void testBlockThrottle() throws Exception { // Make sure chunks are created Assert.assertEquals(15, getNumberOfChunksInContainers(containerSet)); - + OzoneContainer ozoneContainer = Mockito.mock(OzoneContainer.class); + Mockito.when(ozoneContainer.getContainerSet()).thenReturn(containerSet); + Mockito.when(ozoneContainer.getWriteChannel()).thenReturn(null); BlockDeletingServiceTestImpl service = - new BlockDeletingServiceTestImpl(containerSet, 1000, conf); + getBlockDeletinService(containerSet, conf, 1000); service.start(); try { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java index 4ebbf1c913..b872516474 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDeletionChoosingPolicy.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Random; import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.RandomUtils; @@ -35,18 +36,26 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoosingPolicy; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.keyvalue.statemachine.background.BlockDeletingService; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; /** * The class for testing container deletion choosing policy. */ public class TestContainerDeletionChoosingPolicy { private static String path; - private ContainerSet containerSet; + private OzoneContainer ozoneContainer; + private ContainerSet containerSet; private OzoneConfiguration conf; + private BlockDeletingService blockDeletingService; + // the service timeout + private static final int SERVICE_TIMEOUT_IN_MILLISECONDS = 0; + private static final int SERVICE_INTERVAL_IN_MILLISECONDS = 1000; @Before public void init() throws Throwable { @@ -75,23 +84,25 @@ public void testRandomChoosingPolicy() throws IOException { KeyValueContainerData data = new KeyValueContainerData(i, ContainerTestHelper.CONTAINER_MAX_SIZE, UUID.randomUUID().toString(), UUID.randomUUID().toString()); + data.closeContainer(); KeyValueContainer container = new KeyValueContainer(data, conf); containerSet.addContainer(container); Assert.assertTrue( containerSet.getContainerMapCopy() .containsKey(data.getContainerID())); } + blockDeletingService = getBlockDeletingService(); ContainerDeletionChoosingPolicy deletionPolicy = new RandomContainerDeletionChoosingPolicy(); List result0 = - containerSet.chooseContainerForBlockDeletion(5, deletionPolicy); + blockDeletingService.chooseContainerForBlockDeletion(5, deletionPolicy); Assert.assertEquals(5, result0.size()); // test random choosing - List result1 = containerSet + List result1 = blockDeletingService .chooseContainerForBlockDeletion(numContainers, deletionPolicy); - List result2 = containerSet + List result2 = blockDeletingService .chooseContainerForBlockDeletion(numContainers, deletionPolicy); boolean hasShuffled = false; @@ -137,18 +148,20 @@ public void testTopNOrderedChoosingPolicy() throws IOException { name2Count.put(containerId, deletionBlocks); } KeyValueContainer container = new KeyValueContainer(data, conf); + data.closeContainer(); containerSet.addContainer(container); Assert.assertTrue( containerSet.getContainerMapCopy().containsKey(containerId)); } + blockDeletingService = getBlockDeletingService(); ContainerDeletionChoosingPolicy deletionPolicy = new TopNOrderedContainerDeletionChoosingPolicy(); List result0 = - containerSet.chooseContainerForBlockDeletion(5, deletionPolicy); + blockDeletingService.chooseContainerForBlockDeletion(5, deletionPolicy); Assert.assertEquals(5, result0.size()); - List result1 = containerSet + List result1 = blockDeletingService .chooseContainerForBlockDeletion(numContainers + 1, deletionPolicy); // the empty deletion blocks container should not be chosen Assert.assertEquals(numContainers, result1.size()); @@ -164,4 +177,15 @@ public void testTopNOrderedChoosingPolicy() throws IOException { // ensure all the container data are compared Assert.assertEquals(0, name2Count.size()); } + + private BlockDeletingService getBlockDeletingService() { + ozoneContainer = Mockito.mock(OzoneContainer.class); + Mockito.when(ozoneContainer.getContainerSet()).thenReturn(containerSet); + Mockito.when(ozoneContainer.getWriteChannel()).thenReturn(null); + blockDeletingService = new BlockDeletingService(ozoneContainer, + SERVICE_INTERVAL_IN_MILLISECONDS, SERVICE_TIMEOUT_IN_MILLISECONDS, + TimeUnit.MILLISECONDS, conf); + return blockDeletingService; + + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java index 26d16d505a..e9dfb101cc 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; @@ -45,6 +46,7 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.helpers.OmKeyArgs; +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; import org.apache.hadoop.ozone.ozShell.TestOzoneShell; import org.apache.hadoop.ozone.protocol.commands.RetriableDatanodeEventWatcher; @@ -172,6 +174,16 @@ public void testBlockDeletion() throws Exception { OzoneTestUtils.closeContainers(omKeyLocationInfoGroupList, scm); waitForDatanodeCommandRetry(); + + // make sure the containers are closed on the dn + omKeyLocationInfoGroupList.forEach((group) -> { + List locationInfo = group.getLocationList(); + locationInfo.forEach( + (info) -> cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() + .getContainer().getContainerSet() + .getContainer(info.getContainerID()).getContainerData() + .setState(ContainerProtos.ContainerDataProto.State.CLOSED)); + }); waitForDatanodeBlockDeletionStart(); // The blocks should be deleted in the DN. verifyBlocksDeleted(omKeyLocationInfoGroupList);