diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 15e991a3e0..52ea3aa094 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.HddsUtils; import org.apache.ratis.protocol.RaftGroupId; @@ -52,6 +53,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; @@ -95,6 +99,13 @@ * {@link #applyTransaction} need to be enforced in the StateMachine * implementation. For example, synchronization between writeChunk and * createContainer in {@link ContainerStateMachine}. + * + * PutKey is synchronized with WriteChunk operations, PutKey for a block is + * executed only after all the WriteChunk preceding the PutKey have finished. + * + * CloseContainer is synchronized with WriteChunk and PutKey operations, + * CloseContainer for a container is processed after all the preceding write + * operations for the container have finished. * */ public class ContainerStateMachine extends BaseStateMachine { static final Logger LOG = LoggerFactory.getLogger( @@ -105,15 +116,14 @@ public class ContainerStateMachine extends BaseStateMachine { private ThreadPoolExecutor chunkExecutor; private final ConcurrentHashMap> writeChunkFutureMap; - private final ConcurrentHashMap> - createContainerFutureMap; + private final ConcurrentHashMap stateMachineMap; - ContainerStateMachine(ContainerDispatcher dispatcher, + public ContainerStateMachine(ContainerDispatcher dispatcher, ThreadPoolExecutor chunkExecutor) { this.dispatcher = dispatcher; this.chunkExecutor = chunkExecutor; this.writeChunkFutureMap = new ConcurrentHashMap<>(); - this.createContainerFutureMap = new ConcurrentHashMap<>(); + this.stateMachineMap = new ConcurrentHashMap<>(); } @Override @@ -203,32 +213,6 @@ private Message runCommand(ContainerCommandRequestProto requestProto) { return dispatchCommand(requestProto)::toByteString; } - private CompletableFuture handleWriteChunk( - ContainerCommandRequestProto requestProto, long entryIndex) { - final WriteChunkRequestProto write = requestProto.getWriteChunk(); - long containerID = write.getBlockID().getContainerID(); - CompletableFuture future = - createContainerFutureMap.get(containerID); - CompletableFuture writeChunkFuture; - if (future != null) { - writeChunkFuture = future.thenApplyAsync( - v -> runCommand(requestProto), chunkExecutor); - } else { - writeChunkFuture = CompletableFuture.supplyAsync( - () -> runCommand(requestProto), chunkExecutor); - } - writeChunkFutureMap.put(entryIndex, writeChunkFuture); - return writeChunkFuture; - } - - private CompletableFuture handleCreateContainer( - ContainerCommandRequestProto requestProto) { - long containerID = requestProto.getContainerID(); - createContainerFutureMap. - computeIfAbsent(containerID, k -> new CompletableFuture<>()); - return CompletableFuture.completedFuture(() -> ByteString.EMPTY); - } - /* * writeStateMachineData calls are not synchronized with each other * and also with applyTransaction. @@ -239,15 +223,17 @@ public CompletableFuture writeStateMachineData(LogEntryProto entry) { final ContainerCommandRequestProto requestProto = getRequestProto(entry.getSmLogEntry().getStateMachineData()); Type cmdType = requestProto.getCmdType(); - switch (cmdType) { - case CreateContainer: - return handleCreateContainer(requestProto); - case WriteChunk: - return handleWriteChunk(requestProto, entry.getIndex()); - default: - throw new IllegalStateException("Cmd Type:" + cmdType - + " should not have state machine data"); + long containerId = requestProto.getContainerID(); + stateMachineMap + .computeIfAbsent(containerId, k -> new StateMachineHelper()); + CompletableFuture stateMachineFuture = + stateMachineMap.get(containerId) + .handleStateMachineData(requestProto, entry.getIndex()); + if (stateMachineFuture == null) { + throw new IllegalStateException( + "Cmd Type:" + cmdType + " should not have state machine data"); } + return stateMachineFuture; } catch (IOException e) { return completeExceptionally(e); } @@ -363,25 +349,13 @@ public CompletableFuture applyTransaction(TransactionContext trx) { try { ContainerCommandRequestProto requestProto = getRequestProto(trx.getSMLogEntry().getData()); - Type cmdType = requestProto.getCmdType(); - - if (cmdType == Type.WriteChunk) { - WriteChunkRequestProto write = requestProto.getWriteChunk(); - // the data field has already been removed in start Transaction - Preconditions.checkArgument(!write.hasData()); - CompletableFuture stateMachineFuture = - writeChunkFutureMap.remove(trx.getLogEntry().getIndex()); - return stateMachineFuture - .thenComposeAsync(v -> - CompletableFuture.completedFuture(runCommand(requestProto))); - } else { - Message message = runCommand(requestProto); - if (cmdType == Type.CreateContainer) { - long containerID = requestProto.getContainerID(); - createContainerFutureMap.remove(containerID).complete(message); - } - return CompletableFuture.completedFuture(message); - } + Preconditions.checkState(!HddsUtils.isReadOnly(requestProto)); + stateMachineMap.computeIfAbsent(requestProto.getContainerID(), + k -> new StateMachineHelper()); + long index = + trx.getLogEntry() == null ? -1 : trx.getLogEntry().getIndex(); + return stateMachineMap.get(requestProto.getContainerID()) + .executeContainerCommand(requestProto, index); } catch (IOException e) { return completeExceptionally(e); } @@ -396,4 +370,239 @@ private static CompletableFuture completeExceptionally(Exception e) { @Override public void close() throws IOException { } + + /** + * Class to manage the future tasks for writeChunks. + */ + static class CommitChunkFutureMap { + private final ConcurrentHashMap> + block2ChunkMap = new ConcurrentHashMap<>(); + + synchronized int removeAndGetSize(long index) { + block2ChunkMap.remove(index); + return block2ChunkMap.size(); + } + + synchronized CompletableFuture add(long index, + CompletableFuture future) { + return block2ChunkMap.put(index, future); + } + + synchronized List> getAll() { + return new ArrayList<>(block2ChunkMap.values()); + } + } + + /** + * This class maintains maps and provide utilities to enforce synchronization + * among createContainer, writeChunk, putKey and closeContainer. + */ + private class StateMachineHelper { + + private CompletableFuture createContainerFuture; + + // Map for maintaining all writeChunk futures mapped to blockId + private final ConcurrentHashMap + block2ChunkMap; + + // Map for putKey futures + private final ConcurrentHashMap> + blockCommitMap; + + StateMachineHelper() { + createContainerFuture = null; + block2ChunkMap = new ConcurrentHashMap<>(); + blockCommitMap = new ConcurrentHashMap<>(); + } + + // The following section handles writeStateMachineData transactions + // on a container + + // enqueue the create container future during writeStateMachineData + // so that the write stateMachine data phase of writeChunk wait on + // create container to finish. + private CompletableFuture handleCreateContainer() { + createContainerFuture = new CompletableFuture<>(); + return CompletableFuture.completedFuture(() -> ByteString.EMPTY); + } + + // This synchronizes on create container to finish + private CompletableFuture handleWriteChunk( + ContainerCommandRequestProto requestProto, long entryIndex) { + CompletableFuture containerOpFuture; + + if (createContainerFuture != null) { + containerOpFuture = createContainerFuture + .thenApplyAsync(v -> runCommand(requestProto), chunkExecutor); + } else { + containerOpFuture = CompletableFuture + .supplyAsync(() -> runCommand(requestProto), chunkExecutor); + } + writeChunkFutureMap.put(entryIndex, containerOpFuture); + return containerOpFuture; + } + + CompletableFuture handleStateMachineData( + final ContainerCommandRequestProto requestProto, long index) { + Type cmdType = requestProto.getCmdType(); + if (cmdType == Type.CreateContainer) { + return handleCreateContainer(); + } else if (cmdType == Type.WriteChunk) { + return handleWriteChunk(requestProto, index); + } else { + return null; + } + } + + // The following section handles applyTransaction transactions + // on a container + + private CompletableFuture handlePutKey( + ContainerCommandRequestProto requestProto) { + List> futureList = new ArrayList<>(); + long localId = + requestProto.getPutKey().getKeyData().getBlockID().getLocalID(); + // Need not wait for create container future here as it has already + // finished. + if (block2ChunkMap.get(localId) != null) { + futureList.addAll(block2ChunkMap.get(localId).getAll()); + } + CompletableFuture effectiveFuture = + runCommandAfterFutures(futureList, requestProto); + + CompletableFuture putKeyFuture = + effectiveFuture.thenApply(message -> { + blockCommitMap.remove(localId); + return message; + }); + blockCommitMap.put(localId, putKeyFuture); + return putKeyFuture; + } + + // Close Container should be executed only if all pending WriteType + // container cmds get executed. Transactions which can return a future + // are WriteChunk and PutKey. + private CompletableFuture handleCloseContainer( + ContainerCommandRequestProto requestProto) { + List> futureList = new ArrayList<>(); + + // No need to wait for create container future here as it should have + // already finished. + block2ChunkMap.values().forEach(b -> futureList.addAll(b.getAll())); + futureList.addAll(blockCommitMap.values()); + + // There are pending write Chunk/PutKey type requests + // Queue this closeContainer request behind all these requests + CompletableFuture closeContainerFuture = + runCommandAfterFutures(futureList, requestProto); + + return closeContainerFuture.thenApply(message -> { + stateMachineMap.remove(requestProto.getContainerID()); + return message; + }); + } + + private CompletableFuture handleChunkCommit( + ContainerCommandRequestProto requestProto, long index) { + WriteChunkRequestProto write = requestProto.getWriteChunk(); + // the data field has already been removed in start Transaction + Preconditions.checkArgument(!write.hasData()); + CompletableFuture stateMachineFuture = + writeChunkFutureMap.remove(index); + CompletableFuture commitChunkFuture = stateMachineFuture + .thenComposeAsync(v -> CompletableFuture + .completedFuture(runCommand(requestProto))); + + long localId = requestProto.getWriteChunk().getBlockID().getLocalID(); + // Put the applyTransaction Future again to the Map. + // closeContainer should synchronize with this. + block2ChunkMap + .computeIfAbsent(localId, id -> new CommitChunkFutureMap()) + .add(index, commitChunkFuture); + return commitChunkFuture.thenApply(message -> { + block2ChunkMap.computeIfPresent(localId, (containerId, chunks) + -> chunks.removeAndGetSize(index) == 0? null: chunks); + return message; + }); + } + + private CompletableFuture runCommandAfterFutures( + List> futureList, + ContainerCommandRequestProto requestProto) { + CompletableFuture effectiveFuture; + if (futureList.isEmpty()) { + effectiveFuture = CompletableFuture + .supplyAsync(() -> runCommand(requestProto)); + + } else { + CompletableFuture allFuture = CompletableFuture.allOf( + futureList.toArray(new CompletableFuture[futureList.size()])); + effectiveFuture = allFuture + .thenApplyAsync(v -> runCommand(requestProto)); + } + return effectiveFuture; + } + + CompletableFuture handleCreateContainer( + ContainerCommandRequestProto requestProto) { + CompletableFuture future = + CompletableFuture.completedFuture(runCommand(requestProto)); + future.thenAccept(m -> { + createContainerFuture.complete(m); + createContainerFuture = null; + }); + return future; + } + + CompletableFuture handleOtherCommands( + ContainerCommandRequestProto requestProto) { + return CompletableFuture.completedFuture(runCommand(requestProto)); + } + + CompletableFuture executeContainerCommand( + ContainerCommandRequestProto requestProto, long index) { + Type cmdType = requestProto.getCmdType(); + switch (cmdType) { + case WriteChunk: + return handleChunkCommit(requestProto, index); + case CloseContainer: + return handleCloseContainer(requestProto); + case PutKey: + return handlePutKey(requestProto); + case CreateContainer: + return handleCreateContainer(requestProto); + default: + return handleOtherCommands(requestProto); + } + } + } + + @VisibleForTesting + public ConcurrentHashMap getStateMachineMap() { + return stateMachineMap; + } + + @VisibleForTesting + public CompletableFuture getCreateContainerFuture(long containerId) { + StateMachineHelper helper = stateMachineMap.get(containerId); + return helper == null ? null : helper.createContainerFuture; + } + + @VisibleForTesting + public List> getCommitChunkFutureMap( + long containerId) { + StateMachineHelper helper = stateMachineMap.get(containerId); + if (helper != null) { + List> futureList = new ArrayList<>(); + stateMachineMap.get(containerId).block2ChunkMap.values() + .forEach(b -> futureList.addAll(b.getAll())); + return futureList; + } + return null; + } + + @VisibleForTesting + public Collection> getWriteChunkFutureMap() { + return writeChunkFutureMap.values(); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java new file mode 100644 index 0000000000..448742eb23 --- /dev/null +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.server; + + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerCommandResponseProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ContainerType; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.ozone.container.ContainerTestHelper; +import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; +import org.apache.hadoop.ozone.container.common.interfaces.Handler; +import org.apache.hadoop.ozone.container.common.transport.server.ratis + .ContainerStateMachine; +import org.apache.ratis.RatisHelper; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.shaded.proto.RaftProtos; +import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.util.ProtoUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Random; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This class tests ContainerStateMachine. + */ +public class TestContainerStateMachine { + + private static final AtomicLong CALL_ID_COUNTER = new AtomicLong(); + + private static long nextCallId() { + return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE; + } + + private ThreadPoolExecutor executor = + new ThreadPoolExecutor(4, 4, 100, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(1024), + new ThreadPoolExecutor.CallerRunsPolicy()); + private ContainerStateMachine stateMachine = + new ContainerStateMachine(new TestContainerDispatcher(), executor); + + + @Test + public void testCloseContainerSynchronization() throws Exception { + Pipeline pipeline = ContainerTestHelper.createPipeline(3); + long containerId = new Random().nextLong(); + + //create container request + RaftClientRequest createContainer = getRaftClientRequest( + ContainerTestHelper.getCreateContainerRequest(containerId, pipeline)); + + ContainerCommandRequestProto writeChunkProto = ContainerTestHelper + .getWriteChunkRequest(pipeline, new BlockID(containerId, nextCallId()), + 1024); + + RaftClientRequest writeChunkRequest = getRaftClientRequest(writeChunkProto); + + // add putKey request + ContainerCommandRequestProto putKeyProto = ContainerTestHelper + .getPutKeyRequest(pipeline, writeChunkProto.getWriteChunk()); + RaftClientRequest putKeyRequest = getRaftClientRequest(putKeyProto); + + TransactionContext createContainerCtxt = + startAndWriteStateMachineData(createContainer); + // Start and Write into the StateMachine + TransactionContext writeChunkcontext = + startAndWriteStateMachineData(writeChunkRequest); + + TransactionContext putKeyContext = + stateMachine.startTransaction(putKeyRequest); + Assert.assertEquals(1, stateMachine.getStateMachineMap().size()); + Assert.assertNotNull(stateMachine.getCreateContainerFuture(containerId)); + Assert.assertEquals(1, + stateMachine.getWriteChunkFutureMap().size()); + Assert.assertTrue( + stateMachine.getCommitChunkFutureMap(containerId).isEmpty()); + + //Add a closeContainerRequest + RaftClientRequest closeRequest = getRaftClientRequest( + ContainerTestHelper.getCloseContainer(pipeline, containerId)); + + TransactionContext closeCtx = stateMachine.startTransaction(closeRequest); + + // Now apply all the transaction for the CreateContainer Command. + // This will unblock writeChunks as well + + stateMachine.applyTransaction(createContainerCtxt); + stateMachine.applyTransaction(writeChunkcontext); + CompletableFuture putKeyFuture = + stateMachine.applyTransaction(putKeyContext); + waitForTransactionCompletion(putKeyFuture); + // Make sure the putKey transaction complete + Assert.assertTrue(putKeyFuture.isDone()); + + // Execute the closeContainer. This should ensure all prior Write Type + // container requests finish execution + + CompletableFuture closeFuture = + stateMachine.applyTransaction(closeCtx); + waitForTransactionCompletion(closeFuture); + // Make sure the closeContainer transaction complete + Assert.assertTrue(closeFuture.isDone()); + Assert.assertNull(stateMachine.getCreateContainerFuture(containerId)); + Assert.assertNull(stateMachine.getCommitChunkFutureMap(containerId)); + + } + + private RaftClientRequest getRaftClientRequest( + ContainerCommandRequestProto req) throws IOException { + ClientId clientId = ClientId.randomId(); + return new RaftClientRequest(clientId, + RatisHelper.toRaftPeerId(ContainerTestHelper.createDatanodeDetails()), + RatisHelper.emptyRaftGroup().getGroupId(), nextCallId(), 0, + Message.valueOf(req.toByteString()), RaftClientRequest + .writeRequestType(RaftProtos.ReplicationLevel.MAJORITY)); + } + + private void waitForTransactionCompletion( + CompletableFuture future) throws Exception { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService + .invokeAll(Collections.singleton(future::get), 10, + TimeUnit.SECONDS); // Timeout of 10 minutes. + executorService.shutdown(); + } + + private TransactionContext startAndWriteStateMachineData( + RaftClientRequest request) throws IOException { + TransactionContext ctx = stateMachine.startTransaction(request); + RaftProtos.LogEntryProto e = ProtoUtils + .toLogEntryProto(ctx.getSMLogEntry(), request.getSeqNum(), + request.getCallId(), ClientId.randomId(), request.getCallId()); + ctx.setLogEntry(e); + stateMachine.writeStateMachineData(e); + return ctx; + } + + // ContainerDispatcher for test only purpose. + private static class TestContainerDispatcher implements ContainerDispatcher { + /** + * Dispatches commands to container layer. + * + * @param msg - Command Request + * @return Command Response + */ + @Override + public ContainerCommandResponseProto dispatch( + ContainerCommandRequestProto msg) { + return ContainerTestHelper.getCreateContainerResponse(msg); + } + + @Override + public void init() { + } + + @Override + public void shutdown() { + } + + @Override + public void setScmId(String scmId) { + } + + @Override + public Handler getHandler(ContainerType containerType) { + return null; + } + } +} \ No newline at end of file