HDDS-179. CloseContainer/PutKey command should be syncronized with write operations. Contributed by Shashikant Banerjee.
This commit is contained in:
parent
0e832e7a74
commit
5ef29087ad
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
|
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.hdds.HddsUtils;
|
import org.apache.hadoop.hdds.HddsUtils;
|
||||||
import org.apache.ratis.protocol.RaftGroupId;
|
import org.apache.ratis.protocol.RaftGroupId;
|
||||||
@ -52,6 +53,9 @@
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
@ -95,6 +99,13 @@
|
|||||||
* {@link #applyTransaction} need to be enforced in the StateMachine
|
* {@link #applyTransaction} need to be enforced in the StateMachine
|
||||||
* implementation. For example, synchronization between writeChunk and
|
* implementation. For example, synchronization between writeChunk and
|
||||||
* createContainer in {@link ContainerStateMachine}.
|
* createContainer in {@link ContainerStateMachine}.
|
||||||
|
*
|
||||||
|
* PutKey is synchronized with WriteChunk operations, PutKey for a block is
|
||||||
|
* executed only after all the WriteChunk preceding the PutKey have finished.
|
||||||
|
*
|
||||||
|
* CloseContainer is synchronized with WriteChunk and PutKey operations,
|
||||||
|
* CloseContainer for a container is processed after all the preceding write
|
||||||
|
* operations for the container have finished.
|
||||||
* */
|
* */
|
||||||
public class ContainerStateMachine extends BaseStateMachine {
|
public class ContainerStateMachine extends BaseStateMachine {
|
||||||
static final Logger LOG = LoggerFactory.getLogger(
|
static final Logger LOG = LoggerFactory.getLogger(
|
||||||
@ -105,15 +116,14 @@ public class ContainerStateMachine extends BaseStateMachine {
|
|||||||
private ThreadPoolExecutor chunkExecutor;
|
private ThreadPoolExecutor chunkExecutor;
|
||||||
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
|
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
|
||||||
writeChunkFutureMap;
|
writeChunkFutureMap;
|
||||||
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
|
private final ConcurrentHashMap<Long, StateMachineHelper> stateMachineMap;
|
||||||
createContainerFutureMap;
|
|
||||||
|
|
||||||
ContainerStateMachine(ContainerDispatcher dispatcher,
|
public ContainerStateMachine(ContainerDispatcher dispatcher,
|
||||||
ThreadPoolExecutor chunkExecutor) {
|
ThreadPoolExecutor chunkExecutor) {
|
||||||
this.dispatcher = dispatcher;
|
this.dispatcher = dispatcher;
|
||||||
this.chunkExecutor = chunkExecutor;
|
this.chunkExecutor = chunkExecutor;
|
||||||
this.writeChunkFutureMap = new ConcurrentHashMap<>();
|
this.writeChunkFutureMap = new ConcurrentHashMap<>();
|
||||||
this.createContainerFutureMap = new ConcurrentHashMap<>();
|
this.stateMachineMap = new ConcurrentHashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -203,32 +213,6 @@ private Message runCommand(ContainerCommandRequestProto requestProto) {
|
|||||||
return dispatchCommand(requestProto)::toByteString;
|
return dispatchCommand(requestProto)::toByteString;
|
||||||
}
|
}
|
||||||
|
|
||||||
private CompletableFuture<Message> handleWriteChunk(
|
|
||||||
ContainerCommandRequestProto requestProto, long entryIndex) {
|
|
||||||
final WriteChunkRequestProto write = requestProto.getWriteChunk();
|
|
||||||
long containerID = write.getBlockID().getContainerID();
|
|
||||||
CompletableFuture<Message> future =
|
|
||||||
createContainerFutureMap.get(containerID);
|
|
||||||
CompletableFuture<Message> writeChunkFuture;
|
|
||||||
if (future != null) {
|
|
||||||
writeChunkFuture = future.thenApplyAsync(
|
|
||||||
v -> runCommand(requestProto), chunkExecutor);
|
|
||||||
} else {
|
|
||||||
writeChunkFuture = CompletableFuture.supplyAsync(
|
|
||||||
() -> runCommand(requestProto), chunkExecutor);
|
|
||||||
}
|
|
||||||
writeChunkFutureMap.put(entryIndex, writeChunkFuture);
|
|
||||||
return writeChunkFuture;
|
|
||||||
}
|
|
||||||
|
|
||||||
private CompletableFuture<Message> handleCreateContainer(
|
|
||||||
ContainerCommandRequestProto requestProto) {
|
|
||||||
long containerID = requestProto.getContainerID();
|
|
||||||
createContainerFutureMap.
|
|
||||||
computeIfAbsent(containerID, k -> new CompletableFuture<>());
|
|
||||||
return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* writeStateMachineData calls are not synchronized with each other
|
* writeStateMachineData calls are not synchronized with each other
|
||||||
* and also with applyTransaction.
|
* and also with applyTransaction.
|
||||||
@ -239,15 +223,17 @@ public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
|
|||||||
final ContainerCommandRequestProto requestProto =
|
final ContainerCommandRequestProto requestProto =
|
||||||
getRequestProto(entry.getSmLogEntry().getStateMachineData());
|
getRequestProto(entry.getSmLogEntry().getStateMachineData());
|
||||||
Type cmdType = requestProto.getCmdType();
|
Type cmdType = requestProto.getCmdType();
|
||||||
switch (cmdType) {
|
long containerId = requestProto.getContainerID();
|
||||||
case CreateContainer:
|
stateMachineMap
|
||||||
return handleCreateContainer(requestProto);
|
.computeIfAbsent(containerId, k -> new StateMachineHelper());
|
||||||
case WriteChunk:
|
CompletableFuture<Message> stateMachineFuture =
|
||||||
return handleWriteChunk(requestProto, entry.getIndex());
|
stateMachineMap.get(containerId)
|
||||||
default:
|
.handleStateMachineData(requestProto, entry.getIndex());
|
||||||
throw new IllegalStateException("Cmd Type:" + cmdType
|
if (stateMachineFuture == null) {
|
||||||
+ " should not have state machine data");
|
throw new IllegalStateException(
|
||||||
|
"Cmd Type:" + cmdType + " should not have state machine data");
|
||||||
}
|
}
|
||||||
|
return stateMachineFuture;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
return completeExceptionally(e);
|
return completeExceptionally(e);
|
||||||
}
|
}
|
||||||
@ -363,25 +349,13 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
|
|||||||
try {
|
try {
|
||||||
ContainerCommandRequestProto requestProto =
|
ContainerCommandRequestProto requestProto =
|
||||||
getRequestProto(trx.getSMLogEntry().getData());
|
getRequestProto(trx.getSMLogEntry().getData());
|
||||||
Type cmdType = requestProto.getCmdType();
|
Preconditions.checkState(!HddsUtils.isReadOnly(requestProto));
|
||||||
|
stateMachineMap.computeIfAbsent(requestProto.getContainerID(),
|
||||||
if (cmdType == Type.WriteChunk) {
|
k -> new StateMachineHelper());
|
||||||
WriteChunkRequestProto write = requestProto.getWriteChunk();
|
long index =
|
||||||
// the data field has already been removed in start Transaction
|
trx.getLogEntry() == null ? -1 : trx.getLogEntry().getIndex();
|
||||||
Preconditions.checkArgument(!write.hasData());
|
return stateMachineMap.get(requestProto.getContainerID())
|
||||||
CompletableFuture<Message> stateMachineFuture =
|
.executeContainerCommand(requestProto, index);
|
||||||
writeChunkFutureMap.remove(trx.getLogEntry().getIndex());
|
|
||||||
return stateMachineFuture
|
|
||||||
.thenComposeAsync(v ->
|
|
||||||
CompletableFuture.completedFuture(runCommand(requestProto)));
|
|
||||||
} else {
|
|
||||||
Message message = runCommand(requestProto);
|
|
||||||
if (cmdType == Type.CreateContainer) {
|
|
||||||
long containerID = requestProto.getContainerID();
|
|
||||||
createContainerFutureMap.remove(containerID).complete(message);
|
|
||||||
}
|
|
||||||
return CompletableFuture.completedFuture(message);
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
return completeExceptionally(e);
|
return completeExceptionally(e);
|
||||||
}
|
}
|
||||||
@ -396,4 +370,239 @@ private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
|
|||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to manage the future tasks for writeChunks.
|
||||||
|
*/
|
||||||
|
static class CommitChunkFutureMap {
|
||||||
|
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
|
||||||
|
block2ChunkMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
synchronized int removeAndGetSize(long index) {
|
||||||
|
block2ChunkMap.remove(index);
|
||||||
|
return block2ChunkMap.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized CompletableFuture<Message> add(long index,
|
||||||
|
CompletableFuture<Message> future) {
|
||||||
|
return block2ChunkMap.put(index, future);
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized List<CompletableFuture<Message>> getAll() {
|
||||||
|
return new ArrayList<>(block2ChunkMap.values());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class maintains maps and provide utilities to enforce synchronization
|
||||||
|
* among createContainer, writeChunk, putKey and closeContainer.
|
||||||
|
*/
|
||||||
|
private class StateMachineHelper {
|
||||||
|
|
||||||
|
private CompletableFuture<Message> createContainerFuture;
|
||||||
|
|
||||||
|
// Map for maintaining all writeChunk futures mapped to blockId
|
||||||
|
private final ConcurrentHashMap<Long, CommitChunkFutureMap>
|
||||||
|
block2ChunkMap;
|
||||||
|
|
||||||
|
// Map for putKey futures
|
||||||
|
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
|
||||||
|
blockCommitMap;
|
||||||
|
|
||||||
|
StateMachineHelper() {
|
||||||
|
createContainerFuture = null;
|
||||||
|
block2ChunkMap = new ConcurrentHashMap<>();
|
||||||
|
blockCommitMap = new ConcurrentHashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
// The following section handles writeStateMachineData transactions
|
||||||
|
// on a container
|
||||||
|
|
||||||
|
// enqueue the create container future during writeStateMachineData
|
||||||
|
// so that the write stateMachine data phase of writeChunk wait on
|
||||||
|
// create container to finish.
|
||||||
|
private CompletableFuture<Message> handleCreateContainer() {
|
||||||
|
createContainerFuture = new CompletableFuture<>();
|
||||||
|
return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
|
||||||
|
}
|
||||||
|
|
||||||
|
// This synchronizes on create container to finish
|
||||||
|
private CompletableFuture<Message> handleWriteChunk(
|
||||||
|
ContainerCommandRequestProto requestProto, long entryIndex) {
|
||||||
|
CompletableFuture<Message> containerOpFuture;
|
||||||
|
|
||||||
|
if (createContainerFuture != null) {
|
||||||
|
containerOpFuture = createContainerFuture
|
||||||
|
.thenApplyAsync(v -> runCommand(requestProto), chunkExecutor);
|
||||||
|
} else {
|
||||||
|
containerOpFuture = CompletableFuture
|
||||||
|
.supplyAsync(() -> runCommand(requestProto), chunkExecutor);
|
||||||
|
}
|
||||||
|
writeChunkFutureMap.put(entryIndex, containerOpFuture);
|
||||||
|
return containerOpFuture;
|
||||||
|
}
|
||||||
|
|
||||||
|
CompletableFuture<Message> handleStateMachineData(
|
||||||
|
final ContainerCommandRequestProto requestProto, long index) {
|
||||||
|
Type cmdType = requestProto.getCmdType();
|
||||||
|
if (cmdType == Type.CreateContainer) {
|
||||||
|
return handleCreateContainer();
|
||||||
|
} else if (cmdType == Type.WriteChunk) {
|
||||||
|
return handleWriteChunk(requestProto, index);
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The following section handles applyTransaction transactions
|
||||||
|
// on a container
|
||||||
|
|
||||||
|
private CompletableFuture<Message> handlePutKey(
|
||||||
|
ContainerCommandRequestProto requestProto) {
|
||||||
|
List<CompletableFuture<Message>> futureList = new ArrayList<>();
|
||||||
|
long localId =
|
||||||
|
requestProto.getPutKey().getKeyData().getBlockID().getLocalID();
|
||||||
|
// Need not wait for create container future here as it has already
|
||||||
|
// finished.
|
||||||
|
if (block2ChunkMap.get(localId) != null) {
|
||||||
|
futureList.addAll(block2ChunkMap.get(localId).getAll());
|
||||||
|
}
|
||||||
|
CompletableFuture<Message> effectiveFuture =
|
||||||
|
runCommandAfterFutures(futureList, requestProto);
|
||||||
|
|
||||||
|
CompletableFuture<Message> putKeyFuture =
|
||||||
|
effectiveFuture.thenApply(message -> {
|
||||||
|
blockCommitMap.remove(localId);
|
||||||
|
return message;
|
||||||
|
});
|
||||||
|
blockCommitMap.put(localId, putKeyFuture);
|
||||||
|
return putKeyFuture;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close Container should be executed only if all pending WriteType
|
||||||
|
// container cmds get executed. Transactions which can return a future
|
||||||
|
// are WriteChunk and PutKey.
|
||||||
|
private CompletableFuture<Message> handleCloseContainer(
|
||||||
|
ContainerCommandRequestProto requestProto) {
|
||||||
|
List<CompletableFuture<Message>> futureList = new ArrayList<>();
|
||||||
|
|
||||||
|
// No need to wait for create container future here as it should have
|
||||||
|
// already finished.
|
||||||
|
block2ChunkMap.values().forEach(b -> futureList.addAll(b.getAll()));
|
||||||
|
futureList.addAll(blockCommitMap.values());
|
||||||
|
|
||||||
|
// There are pending write Chunk/PutKey type requests
|
||||||
|
// Queue this closeContainer request behind all these requests
|
||||||
|
CompletableFuture<Message> closeContainerFuture =
|
||||||
|
runCommandAfterFutures(futureList, requestProto);
|
||||||
|
|
||||||
|
return closeContainerFuture.thenApply(message -> {
|
||||||
|
stateMachineMap.remove(requestProto.getContainerID());
|
||||||
|
return message;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private CompletableFuture<Message> handleChunkCommit(
|
||||||
|
ContainerCommandRequestProto requestProto, long index) {
|
||||||
|
WriteChunkRequestProto write = requestProto.getWriteChunk();
|
||||||
|
// the data field has already been removed in start Transaction
|
||||||
|
Preconditions.checkArgument(!write.hasData());
|
||||||
|
CompletableFuture<Message> stateMachineFuture =
|
||||||
|
writeChunkFutureMap.remove(index);
|
||||||
|
CompletableFuture<Message> commitChunkFuture = stateMachineFuture
|
||||||
|
.thenComposeAsync(v -> CompletableFuture
|
||||||
|
.completedFuture(runCommand(requestProto)));
|
||||||
|
|
||||||
|
long localId = requestProto.getWriteChunk().getBlockID().getLocalID();
|
||||||
|
// Put the applyTransaction Future again to the Map.
|
||||||
|
// closeContainer should synchronize with this.
|
||||||
|
block2ChunkMap
|
||||||
|
.computeIfAbsent(localId, id -> new CommitChunkFutureMap())
|
||||||
|
.add(index, commitChunkFuture);
|
||||||
|
return commitChunkFuture.thenApply(message -> {
|
||||||
|
block2ChunkMap.computeIfPresent(localId, (containerId, chunks)
|
||||||
|
-> chunks.removeAndGetSize(index) == 0? null: chunks);
|
||||||
|
return message;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private CompletableFuture<Message> runCommandAfterFutures(
|
||||||
|
List<CompletableFuture<Message>> futureList,
|
||||||
|
ContainerCommandRequestProto requestProto) {
|
||||||
|
CompletableFuture<Message> effectiveFuture;
|
||||||
|
if (futureList.isEmpty()) {
|
||||||
|
effectiveFuture = CompletableFuture
|
||||||
|
.supplyAsync(() -> runCommand(requestProto));
|
||||||
|
|
||||||
|
} else {
|
||||||
|
CompletableFuture<Void> allFuture = CompletableFuture.allOf(
|
||||||
|
futureList.toArray(new CompletableFuture[futureList.size()]));
|
||||||
|
effectiveFuture = allFuture
|
||||||
|
.thenApplyAsync(v -> runCommand(requestProto));
|
||||||
|
}
|
||||||
|
return effectiveFuture;
|
||||||
|
}
|
||||||
|
|
||||||
|
CompletableFuture<Message> handleCreateContainer(
|
||||||
|
ContainerCommandRequestProto requestProto) {
|
||||||
|
CompletableFuture<Message> future =
|
||||||
|
CompletableFuture.completedFuture(runCommand(requestProto));
|
||||||
|
future.thenAccept(m -> {
|
||||||
|
createContainerFuture.complete(m);
|
||||||
|
createContainerFuture = null;
|
||||||
|
});
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
CompletableFuture<Message> handleOtherCommands(
|
||||||
|
ContainerCommandRequestProto requestProto) {
|
||||||
|
return CompletableFuture.completedFuture(runCommand(requestProto));
|
||||||
|
}
|
||||||
|
|
||||||
|
CompletableFuture<Message> executeContainerCommand(
|
||||||
|
ContainerCommandRequestProto requestProto, long index) {
|
||||||
|
Type cmdType = requestProto.getCmdType();
|
||||||
|
switch (cmdType) {
|
||||||
|
case WriteChunk:
|
||||||
|
return handleChunkCommit(requestProto, index);
|
||||||
|
case CloseContainer:
|
||||||
|
return handleCloseContainer(requestProto);
|
||||||
|
case PutKey:
|
||||||
|
return handlePutKey(requestProto);
|
||||||
|
case CreateContainer:
|
||||||
|
return handleCreateContainer(requestProto);
|
||||||
|
default:
|
||||||
|
return handleOtherCommands(requestProto);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public ConcurrentHashMap<Long, StateMachineHelper> getStateMachineMap() {
|
||||||
|
return stateMachineMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public CompletableFuture<Message> getCreateContainerFuture(long containerId) {
|
||||||
|
StateMachineHelper helper = stateMachineMap.get(containerId);
|
||||||
|
return helper == null ? null : helper.createContainerFuture;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public List<CompletableFuture<Message>> getCommitChunkFutureMap(
|
||||||
|
long containerId) {
|
||||||
|
StateMachineHelper helper = stateMachineMap.get(containerId);
|
||||||
|
if (helper != null) {
|
||||||
|
List<CompletableFuture<Message>> futureList = new ArrayList<>();
|
||||||
|
stateMachineMap.get(containerId).block2ChunkMap.values()
|
||||||
|
.forEach(b -> futureList.addAll(b.getAll()));
|
||||||
|
return futureList;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public Collection<CompletableFuture<Message>> getWriteChunkFutureMap() {
|
||||||
|
return writeChunkFutureMap.values();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,201 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ozone.container.server;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdds.client.BlockID;
|
||||||
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
|
.ContainerCommandResponseProto;
|
||||||
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
|
.ContainerCommandRequestProto;
|
||||||
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
|
.ContainerType;
|
||||||
|
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
|
||||||
|
import org.apache.hadoop.ozone.container.ContainerTestHelper;
|
||||||
|
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
|
||||||
|
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
|
||||||
|
import org.apache.hadoop.ozone.container.common.transport.server.ratis
|
||||||
|
.ContainerStateMachine;
|
||||||
|
import org.apache.ratis.RatisHelper;
|
||||||
|
import org.apache.ratis.protocol.ClientId;
|
||||||
|
import org.apache.ratis.protocol.Message;
|
||||||
|
import org.apache.ratis.protocol.RaftClientRequest;
|
||||||
|
import org.apache.ratis.shaded.proto.RaftProtos;
|
||||||
|
import org.apache.ratis.statemachine.TransactionContext;
|
||||||
|
import org.apache.ratis.util.ProtoUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class tests ContainerStateMachine.
|
||||||
|
*/
|
||||||
|
public class TestContainerStateMachine {
|
||||||
|
|
||||||
|
private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
|
||||||
|
|
||||||
|
private static long nextCallId() {
|
||||||
|
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ThreadPoolExecutor executor =
|
||||||
|
new ThreadPoolExecutor(4, 4, 100, TimeUnit.SECONDS,
|
||||||
|
new ArrayBlockingQueue<>(1024),
|
||||||
|
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||||
|
private ContainerStateMachine stateMachine =
|
||||||
|
new ContainerStateMachine(new TestContainerDispatcher(), executor);
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCloseContainerSynchronization() throws Exception {
|
||||||
|
Pipeline pipeline = ContainerTestHelper.createPipeline(3);
|
||||||
|
long containerId = new Random().nextLong();
|
||||||
|
|
||||||
|
//create container request
|
||||||
|
RaftClientRequest createContainer = getRaftClientRequest(
|
||||||
|
ContainerTestHelper.getCreateContainerRequest(containerId, pipeline));
|
||||||
|
|
||||||
|
ContainerCommandRequestProto writeChunkProto = ContainerTestHelper
|
||||||
|
.getWriteChunkRequest(pipeline, new BlockID(containerId, nextCallId()),
|
||||||
|
1024);
|
||||||
|
|
||||||
|
RaftClientRequest writeChunkRequest = getRaftClientRequest(writeChunkProto);
|
||||||
|
|
||||||
|
// add putKey request
|
||||||
|
ContainerCommandRequestProto putKeyProto = ContainerTestHelper
|
||||||
|
.getPutKeyRequest(pipeline, writeChunkProto.getWriteChunk());
|
||||||
|
RaftClientRequest putKeyRequest = getRaftClientRequest(putKeyProto);
|
||||||
|
|
||||||
|
TransactionContext createContainerCtxt =
|
||||||
|
startAndWriteStateMachineData(createContainer);
|
||||||
|
// Start and Write into the StateMachine
|
||||||
|
TransactionContext writeChunkcontext =
|
||||||
|
startAndWriteStateMachineData(writeChunkRequest);
|
||||||
|
|
||||||
|
TransactionContext putKeyContext =
|
||||||
|
stateMachine.startTransaction(putKeyRequest);
|
||||||
|
Assert.assertEquals(1, stateMachine.getStateMachineMap().size());
|
||||||
|
Assert.assertNotNull(stateMachine.getCreateContainerFuture(containerId));
|
||||||
|
Assert.assertEquals(1,
|
||||||
|
stateMachine.getWriteChunkFutureMap().size());
|
||||||
|
Assert.assertTrue(
|
||||||
|
stateMachine.getCommitChunkFutureMap(containerId).isEmpty());
|
||||||
|
|
||||||
|
//Add a closeContainerRequest
|
||||||
|
RaftClientRequest closeRequest = getRaftClientRequest(
|
||||||
|
ContainerTestHelper.getCloseContainer(pipeline, containerId));
|
||||||
|
|
||||||
|
TransactionContext closeCtx = stateMachine.startTransaction(closeRequest);
|
||||||
|
|
||||||
|
// Now apply all the transaction for the CreateContainer Command.
|
||||||
|
// This will unblock writeChunks as well
|
||||||
|
|
||||||
|
stateMachine.applyTransaction(createContainerCtxt);
|
||||||
|
stateMachine.applyTransaction(writeChunkcontext);
|
||||||
|
CompletableFuture<Message> putKeyFuture =
|
||||||
|
stateMachine.applyTransaction(putKeyContext);
|
||||||
|
waitForTransactionCompletion(putKeyFuture);
|
||||||
|
// Make sure the putKey transaction complete
|
||||||
|
Assert.assertTrue(putKeyFuture.isDone());
|
||||||
|
|
||||||
|
// Execute the closeContainer. This should ensure all prior Write Type
|
||||||
|
// container requests finish execution
|
||||||
|
|
||||||
|
CompletableFuture<Message> closeFuture =
|
||||||
|
stateMachine.applyTransaction(closeCtx);
|
||||||
|
waitForTransactionCompletion(closeFuture);
|
||||||
|
// Make sure the closeContainer transaction complete
|
||||||
|
Assert.assertTrue(closeFuture.isDone());
|
||||||
|
Assert.assertNull(stateMachine.getCreateContainerFuture(containerId));
|
||||||
|
Assert.assertNull(stateMachine.getCommitChunkFutureMap(containerId));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private RaftClientRequest getRaftClientRequest(
|
||||||
|
ContainerCommandRequestProto req) throws IOException {
|
||||||
|
ClientId clientId = ClientId.randomId();
|
||||||
|
return new RaftClientRequest(clientId,
|
||||||
|
RatisHelper.toRaftPeerId(ContainerTestHelper.createDatanodeDetails()),
|
||||||
|
RatisHelper.emptyRaftGroup().getGroupId(), nextCallId(), 0,
|
||||||
|
Message.valueOf(req.toByteString()), RaftClientRequest
|
||||||
|
.writeRequestType(RaftProtos.ReplicationLevel.MAJORITY));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitForTransactionCompletion(
|
||||||
|
CompletableFuture<Message> future) throws Exception {
|
||||||
|
ExecutorService executorService = Executors.newSingleThreadExecutor();
|
||||||
|
executorService
|
||||||
|
.invokeAll(Collections.singleton(future::get), 10,
|
||||||
|
TimeUnit.SECONDS); // Timeout of 10 minutes.
|
||||||
|
executorService.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
private TransactionContext startAndWriteStateMachineData(
|
||||||
|
RaftClientRequest request) throws IOException {
|
||||||
|
TransactionContext ctx = stateMachine.startTransaction(request);
|
||||||
|
RaftProtos.LogEntryProto e = ProtoUtils
|
||||||
|
.toLogEntryProto(ctx.getSMLogEntry(), request.getSeqNum(),
|
||||||
|
request.getCallId(), ClientId.randomId(), request.getCallId());
|
||||||
|
ctx.setLogEntry(e);
|
||||||
|
stateMachine.writeStateMachineData(e);
|
||||||
|
return ctx;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ContainerDispatcher for test only purpose.
|
||||||
|
private static class TestContainerDispatcher implements ContainerDispatcher {
|
||||||
|
/**
|
||||||
|
* Dispatches commands to container layer.
|
||||||
|
*
|
||||||
|
* @param msg - Command Request
|
||||||
|
* @return Command Response
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public ContainerCommandResponseProto dispatch(
|
||||||
|
ContainerCommandRequestProto msg) {
|
||||||
|
return ContainerTestHelper.getCreateContainerResponse(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setScmId(String scmId) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Handler getHandler(ContainerType containerType) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user