diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index d353e7af42..2f118727cc 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -31,9 +31,9 @@
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.util.Time;
-import org.apache.ratis.shaded.io.grpc.ManagedChannel;
-import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 0d301d9851..4efe7ba818 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -21,7 +21,7 @@
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.shaded.com.google.protobuf
+import org.apache.ratis.thirdparty.com.google.protobuf
.InvalidProtocolBufferException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
@@ -39,7 +39,7 @@
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.CheckedBiConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index a483197b0e..21b8974216 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.hdds.scm.storage;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
index cc1ea8dbe9..4547163d5d 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
@@ -18,8 +18,9 @@
package org.apache.hadoop.hdds.scm.storage;
+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
diff --git a/hadoop-hdds/common/pom.xml b/hadoop-hdds/common/pom.xml
index eea2264f8f..bf2a6b9954 100644
--- a/hadoop-hdds/common/pom.xml
+++ b/hadoop-hdds/common/pom.xml
@@ -172,10 +172,10 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
generate-sources
-
-
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 63f5916828..71d6e07daf 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -19,7 +19,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.util.TimeDuration;
import java.util.concurrent.TimeUnit;
@@ -62,6 +62,10 @@ public final class ScmConfigKeys {
= "dfs.container.ratis.replication.level";
public static final ReplicationLevel
DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT = ReplicationLevel.MAJORITY;
+ public static final String DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY
+ = "dfs.container.ratis.num.container.op.threads";
+ public static final int DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT
+ = 10;
public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY =
"dfs.container.ratis.segment.size";
public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT =
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 1df50b1650..278b129d96 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -20,7 +20,7 @@
import org.apache.hadoop.hdds.scm.container.common.helpers
.BlockNotCommittedException;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers
.StorageContainerException;
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index e8aa22c2ac..d6a6bf7d07 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -24,7 +24,7 @@
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.util.TimeDuration;
/**
@@ -220,6 +220,10 @@ public final class OzoneConfigKeys {
public static final ReplicationLevel
DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT
= ScmConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT;
+ public static final String DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY
+ = ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY;
+ public static final int DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT
+ = ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT;
public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY
= ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY;
public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
index 379d9e9d1d..377153a3cf 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
@@ -22,7 +22,8 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.thirdparty.com.google.common.annotations.
+ VisibleForTesting;
import org.rocksdb.DbPath;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
index 5078b3e96c..cdee10bd50 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java
@@ -23,7 +23,8 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.utils.RocksDBStoreMBean;
-import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.thirdparty.com.google.common.annotations.
+ VisibleForTesting;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
index 04bfeb2e84..2dbe2e6c19 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
@@ -32,8 +32,8 @@
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
-import org.apache.ratis.shaded.proto.RaftProtos;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
@@ -103,7 +103,7 @@ static List toRaftPeers(
RaftGroupId DUMMY_GROUP_ID =
RaftGroupId.valueOf(ByteString.copyFromUtf8("AOzoneRatisGroup"));
- RaftGroup EMPTY_GROUP = new RaftGroup(DUMMY_GROUP_ID,
+ RaftGroup EMPTY_GROUP = RaftGroup.valueOf(DUMMY_GROUP_ID,
Collections.emptyList());
static RaftGroup emptyRaftGroup() {
@@ -112,7 +112,7 @@ static RaftGroup emptyRaftGroup() {
static RaftGroup newRaftGroup(Collection peers) {
return peers.isEmpty()? emptyRaftGroup()
- : new RaftGroup(DUMMY_GROUP_ID, peers);
+ : RaftGroup.valueOf(DUMMY_GROUP_ID, peers);
}
static RaftGroup newRaftGroup(RaftGroupId groupId,
@@ -120,12 +120,12 @@ static RaftGroup newRaftGroup(RaftGroupId groupId,
final List newPeers = peers.stream()
.map(RatisHelper::toRaftPeer)
.collect(Collectors.toList());
- return peers.isEmpty() ? new RaftGroup(groupId, Collections.emptyList())
- : new RaftGroup(groupId, newPeers);
+ return peers.isEmpty() ? RaftGroup.valueOf(groupId, Collections.emptyList())
+ : RaftGroup.valueOf(groupId, newPeers);
}
static RaftGroup newRaftGroup(Pipeline pipeline) {
- return new RaftGroup(pipeline.getId().getRaftGroupID(),
+ return RaftGroup.valueOf(pipeline.getId().getRaftGroupID(),
toRaftPeers(pipeline));
}
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index 456775019c..662df8f3aa 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -197,7 +197,6 @@ message ContainerCommandRequestProto {
optional PutSmallFileRequestProto putSmallFile = 19;
optional GetSmallFileRequestProto getSmallFile = 20;
-
optional GetCommittedBlockLengthRequestProto getCommittedBlockLength = 21;
}
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index ef54ed21e5..0fde6bb9fa 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -135,6 +135,14 @@
MAJORTIY, MAJORITY is used as the default replication level.
+
+ dfs.container.ratis.num.container.op.executors
+ 10
+ OZONE, RATIS, PERFORMANCE
+ Number of executors that will be used by Ratis to execute
+ container ops.(10 by default).
+
+
dfs.container.ratis.segment.size
1073741824
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
index db4a86aa8c..5fc366160a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
@@ -24,7 +24,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto
.XceiverClientProtocolServiceGrpc;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index c51da98fa8..8ebfe49fe6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -33,10 +33,10 @@
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.ratis.shaded.io.grpc.BindableService;
-import org.apache.ratis.shaded.io.grpc.Server;
-import org.apache.ratis.shaded.io.grpc.ServerBuilder;
-import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
+import org.apache.ratis.thirdparty.io.grpc.BindableService;
+import org.apache.ratis.thirdparty.io.grpc.Server;
+import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index f07c95ff05..7b7be9123f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -26,7 +25,7 @@
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.shaded.com.google.protobuf
+import org.apache.ratis.thirdparty.com.google.protobuf
.InvalidProtocolBufferException;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage;
@@ -44,10 +43,10 @@
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
-import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto;
-import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.SMLogEntryProto;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -57,12 +56,12 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
/** A {@link org.apache.ratis.statemachine.StateMachine} for containers.
@@ -98,44 +97,43 @@
*
* 2) Write chunk commit operation is executed after write chunk state machine
* operation. This will ensure that commit operation is sync'd with the state
- * machine operation.
- *
- * Synchronization between {@link #writeStateMachineData} and
- * {@link #applyTransaction} need to be enforced in the StateMachine
- * implementation. For example, synchronization between writeChunk and
+ * machine operation.For example, synchronization between writeChunk and
* createContainer in {@link ContainerStateMachine}.
- *
- * PutBlock is synchronized with WriteChunk operations, PutBlock for a block is
- * executed only after all the WriteChunk preceding the PutBlock have finished.
- *
- * CloseContainer is synchronized with WriteChunk and PutBlock operations,
- * CloseContainer for a container is processed after all the preceding write
- * operations for the container have finished.
- * */
+ **/
+
public class ContainerStateMachine extends BaseStateMachine {
- static final Logger LOG = LoggerFactory.getLogger(
- ContainerStateMachine.class);
- private final SimpleStateMachineStorage storage
- = new SimpleStateMachineStorage();
+ static final Logger LOG =
+ LoggerFactory.getLogger(ContainerStateMachine.class);
+ private final SimpleStateMachineStorage storage =
+ new SimpleStateMachineStorage();
private final ContainerDispatcher dispatcher;
private ThreadPoolExecutor chunkExecutor;
private final XceiverServerRatis ratisServer;
private final ConcurrentHashMap>
writeChunkFutureMap;
- private final ConcurrentHashMap stateMachineMap;
+ private final ConcurrentHashMap>
+ createContainerFutureMap;
+ private ExecutorService[] executors;
+ private final int numExecutors;
/**
* CSM metrics.
*/
private final CSMMetrics metrics;
public ContainerStateMachine(ContainerDispatcher dispatcher,
- ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer) {
+ ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer,
+ int numOfExecutors) {
this.dispatcher = dispatcher;
this.chunkExecutor = chunkExecutor;
this.ratisServer = ratisServer;
this.writeChunkFutureMap = new ConcurrentHashMap<>();
- this.stateMachineMap = new ConcurrentHashMap<>();
metrics = CSMMetrics.create();
+ this.createContainerFutureMap = new ConcurrentHashMap<>();
+ this.numExecutors = numOfExecutors;
+ executors = new ExecutorService[numExecutors];
+ for (int i = 0; i < numExecutors; i++) {
+ executors[i] = Executors.newSingleThreadExecutor();
+ }
}
@Override
@@ -229,6 +227,41 @@ private Message runCommand(ContainerCommandRequestProto requestProto) {
return dispatchCommand(requestProto)::toByteString;
}
+ private ExecutorService getCommandExecutor(
+ ContainerCommandRequestProto requestProto) {
+ int executorId = (int)(requestProto.getContainerID() % numExecutors);
+ return executors[executorId];
+ }
+
+ private CompletableFuture handleWriteChunk(
+ ContainerCommandRequestProto requestProto, long entryIndex) {
+ final WriteChunkRequestProto write = requestProto.getWriteChunk();
+ long containerID = write.getBlockID().getContainerID();
+ CompletableFuture future =
+ createContainerFutureMap.get(containerID);
+ CompletableFuture writeChunkFuture;
+ if (future != null) {
+ writeChunkFuture = future.thenApplyAsync(
+ v -> runCommand(requestProto), chunkExecutor);
+ } else {
+ writeChunkFuture = CompletableFuture.supplyAsync(
+ () -> runCommand(requestProto), chunkExecutor);
+ }
+ writeChunkFutureMap.put(entryIndex, writeChunkFuture);
+ // Remove the future once it finishes execution from the
+ // writeChunkFutureMap.
+ writeChunkFuture.thenApply(r -> writeChunkFutureMap.remove(entryIndex));
+ return writeChunkFuture;
+ }
+
+ private CompletableFuture handleCreateContainer(
+ ContainerCommandRequestProto requestProto) {
+ long containerID = requestProto.getContainerID();
+ createContainerFutureMap.
+ computeIfAbsent(containerID, k -> new CompletableFuture<>());
+ return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
+ }
+
/*
* writeStateMachineData calls are not synchronized with each other
* and also with applyTransaction.
@@ -240,17 +273,15 @@ public CompletableFuture writeStateMachineData(LogEntryProto entry) {
final ContainerCommandRequestProto requestProto =
getRequestProto(entry.getSmLogEntry().getStateMachineData());
Type cmdType = requestProto.getCmdType();
- long containerId = requestProto.getContainerID();
- stateMachineMap
- .computeIfAbsent(containerId, k -> new StateMachineHelper());
- CompletableFuture stateMachineFuture =
- stateMachineMap.get(containerId)
- .handleStateMachineData(requestProto, entry.getIndex());
- if (stateMachineFuture == null) {
- throw new IllegalStateException(
- "Cmd Type:" + cmdType + " should not have state machine data");
+ switch (cmdType) {
+ case CreateContainer:
+ return handleCreateContainer(requestProto);
+ case WriteChunk:
+ return handleWriteChunk(requestProto, entry.getIndex());
+ default:
+ throw new IllegalStateException("Cmd Type:" + cmdType
+ + " should not have state machine data");
}
- return stateMachineFuture;
} catch (IOException e) {
metrics.incNumWriteStateMachineFails();
return completeExceptionally(e);
@@ -270,14 +301,14 @@ public CompletableFuture query(Message request) {
}
}
- private LogEntryProto readStateMachineData(LogEntryProto entry,
+ private ByteString readStateMachineData(LogEntryProto entry,
ContainerCommandRequestProto requestProto) {
WriteChunkRequestProto writeChunkRequestProto =
requestProto.getWriteChunk();
// Assert that store log entry is for COMMIT_DATA, the WRITE_DATA is
// written through writeStateMachineData.
- Preconditions.checkArgument(writeChunkRequestProto.getStage()
- == Stage.COMMIT_DATA);
+ Preconditions
+ .checkArgument(writeChunkRequestProto.getStage() == Stage.COMMIT_DATA);
// prepare the chunk to be read
ReadChunkRequestProto.Builder readChunkRequestProto =
@@ -286,8 +317,7 @@ private LogEntryProto readStateMachineData(LogEntryProto entry,
.setChunkData(writeChunkRequestProto.getChunkData());
ContainerCommandRequestProto dataContainerCommandProto =
ContainerCommandRequestProto.newBuilder(requestProto)
- .setCmdType(Type.ReadChunk)
- .setReadChunk(readChunkRequestProto)
+ .setCmdType(Type.ReadChunk).setReadChunk(readChunkRequestProto)
.build();
// read the chunk
@@ -302,25 +332,13 @@ private LogEntryProto readStateMachineData(LogEntryProto entry,
final WriteChunkRequestProto.Builder dataWriteChunkProto =
WriteChunkRequestProto.newBuilder(writeChunkRequestProto)
// adding the state machine data
- .setData(responseProto.getData())
- .setStage(Stage.WRITE_DATA);
+ .setData(responseProto.getData()).setStage(Stage.WRITE_DATA);
ContainerCommandRequestProto.Builder newStateMachineProto =
ContainerCommandRequestProto.newBuilder(requestProto)
.setWriteChunk(dataWriteChunkProto);
- return recreateLogEntryProto(entry,
- newStateMachineProto.build().toByteString());
- }
-
- private LogEntryProto recreateLogEntryProto(LogEntryProto entry,
- ByteString stateMachineData) {
- // recreate the log entry
- final SMLogEntryProto log =
- SMLogEntryProto.newBuilder(entry.getSmLogEntry())
- .setStateMachineData(stateMachineData)
- .build();
- return LogEntryProto.newBuilder(entry).setSmLogEntry(log).build();
+ return newStateMachineProto.build().toByteString();
}
/**
@@ -347,11 +365,11 @@ public CompletableFuture flushStateMachineData(long index) {
* evicted.
*/
@Override
- public CompletableFuture readStateMachineData(
+ public CompletableFuture readStateMachineData(
LogEntryProto entry) {
SMLogEntryProto smLogEntryProto = entry.getSmLogEntry();
if (!smLogEntryProto.getStateMachineData().isEmpty()) {
- return CompletableFuture.completedFuture(entry);
+ return CompletableFuture.completedFuture(ByteString.EMPTY);
}
try {
@@ -365,9 +383,7 @@ public CompletableFuture readStateMachineData(
readStateMachineData(entry, requestProto),
chunkExecutor);
} else if (requestProto.getCmdType() == Type.CreateContainer) {
- LogEntryProto log =
- recreateLogEntryProto(entry, requestProto.toByteString());
- return CompletableFuture.completedFuture(log);
+ return CompletableFuture.completedFuture(requestProto.toByteString());
} else {
throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
+ " cannot have state machine data");
@@ -387,13 +403,44 @@ public CompletableFuture applyTransaction(TransactionContext trx) {
metrics.incNumApplyTransactionsOps();
ContainerCommandRequestProto requestProto =
getRequestProto(trx.getSMLogEntry().getData());
- Preconditions.checkState(!HddsUtils.isReadOnly(requestProto));
- stateMachineMap.computeIfAbsent(requestProto.getContainerID(),
- k -> new StateMachineHelper());
- long index =
- trx.getLogEntry() == null ? -1 : trx.getLogEntry().getIndex();
- return stateMachineMap.get(requestProto.getContainerID())
- .executeContainerCommand(requestProto, index);
+ Type cmdType = requestProto.getCmdType();
+ CompletableFuture future;
+ if (cmdType == Type.PutBlock) {
+ BlockData blockData;
+ ContainerProtos.BlockData blockDataProto =
+ requestProto.getPutBlock().getBlockData();
+
+ // set the blockCommitSequenceId
+ try {
+ blockData = BlockData.getFromProtoBuf(blockDataProto);
+ } catch (IOException ioe) {
+ LOG.error("unable to retrieve blockData info for Block {}",
+ blockDataProto.getBlockID());
+ return completeExceptionally(ioe);
+ }
+ blockData.setBlockCommitSequenceId(trx.getLogEntry().getIndex());
+ final ContainerProtos.PutBlockRequestProto putBlockRequestProto =
+ ContainerProtos.PutBlockRequestProto
+ .newBuilder(requestProto.getPutBlock())
+ .setBlockData(blockData.getProtoBufMessage()).build();
+ ContainerCommandRequestProto containerCommandRequestProto =
+ ContainerCommandRequestProto.newBuilder(requestProto)
+ .setPutBlock(putBlockRequestProto).build();
+ future = CompletableFuture
+ .supplyAsync(() -> runCommand(containerCommandRequestProto),
+ getCommandExecutor(requestProto));
+ } else {
+ future = CompletableFuture.supplyAsync(() -> runCommand(requestProto),
+ getCommandExecutor(requestProto));
+ }
+ // Mark the createContainerFuture complete so that writeStateMachineData
+ // for WriteChunk gets unblocked
+ if (cmdType == Type.CreateContainer) {
+ long containerID = requestProto.getContainerID();
+ future.thenApply(
+ r -> createContainerFutureMap.remove(containerID).complete(null));
+ }
+ return future;
} catch (IOException e) {
metrics.incNumApplyTransactionsFails();
return completeExceptionally(e);
@@ -419,259 +466,8 @@ public void notifyExtendedNoLeader(RaftGroup group,
@Override
public void close() throws IOException {
- }
-
- /**
- * Class to manage the future tasks for writeChunks.
- */
- static class CommitChunkFutureMap {
- private final ConcurrentHashMap>
- block2ChunkMap = new ConcurrentHashMap<>();
-
- synchronized int removeAndGetSize(long index) {
- block2ChunkMap.remove(index);
- return block2ChunkMap.size();
+ for (int i = 0; i < numExecutors; i++){
+ executors[i].shutdown();
}
-
- synchronized CompletableFuture add(long index,
- CompletableFuture future) {
- return block2ChunkMap.put(index, future);
- }
-
- synchronized List> getAll() {
- return new ArrayList<>(block2ChunkMap.values());
- }
- }
-
- /**
- * This class maintains maps and provide utilities to enforce synchronization
- * among createContainer, writeChunk, putBlock and closeContainer.
- */
- private class StateMachineHelper {
-
- private CompletableFuture createContainerFuture;
-
- // Map for maintaining all writeChunk futures mapped to blockId
- private final ConcurrentHashMap
- block2ChunkMap;
-
- // Map for putBlock futures
- private final ConcurrentHashMap>
- blockCommitMap;
-
- StateMachineHelper() {
- createContainerFuture = null;
- block2ChunkMap = new ConcurrentHashMap<>();
- blockCommitMap = new ConcurrentHashMap<>();
- }
-
- // The following section handles writeStateMachineData transactions
- // on a container
-
- // enqueue the create container future during writeStateMachineData
- // so that the write stateMachine data phase of writeChunk wait on
- // create container to finish.
- private CompletableFuture handleCreateContainer() {
- createContainerFuture = new CompletableFuture<>();
- return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
- }
-
- // This synchronizes on create container to finish
- private CompletableFuture handleWriteChunk(
- ContainerCommandRequestProto requestProto, long entryIndex) {
- CompletableFuture containerOpFuture;
-
- if (createContainerFuture != null) {
- containerOpFuture = createContainerFuture
- .thenApplyAsync(v -> runCommand(requestProto), chunkExecutor);
- } else {
- containerOpFuture = CompletableFuture
- .supplyAsync(() -> runCommand(requestProto), chunkExecutor);
- }
- writeChunkFutureMap.put(entryIndex, containerOpFuture);
- return containerOpFuture;
- }
-
- CompletableFuture handleStateMachineData(
- final ContainerCommandRequestProto requestProto, long index) {
- Type cmdType = requestProto.getCmdType();
- if (cmdType == Type.CreateContainer) {
- return handleCreateContainer();
- } else if (cmdType == Type.WriteChunk) {
- return handleWriteChunk(requestProto, index);
- } else {
- return null;
- }
- }
-
- // The following section handles applyTransaction transactions
- // on a container
-
- private CompletableFuture handlePutBlock(
- ContainerCommandRequestProto requestProto, long index) {
- List> futureList = new ArrayList<>();
- BlockData blockData = null;
- ContainerProtos.BlockData blockDataProto =
- requestProto.getPutBlock().getBlockData();
-
- // set the blockCommitSequenceId
- try {
- blockData = BlockData.getFromProtoBuf(blockDataProto);
- } catch (IOException ioe) {
- LOG.error("unable to retrieve blockData info for Block {}",
- blockDataProto.getBlockID());
- return completeExceptionally(ioe);
- }
- blockData.setBlockCommitSequenceId(index);
- final ContainerProtos.PutBlockRequestProto putBlockRequestProto =
- ContainerProtos.PutBlockRequestProto
- .newBuilder(requestProto.getPutBlock())
- .setBlockData(blockData.getProtoBufMessage()).build();
- ContainerCommandRequestProto containerCommandRequestProto =
- ContainerCommandRequestProto.newBuilder(requestProto)
- .setPutBlock(putBlockRequestProto).build();
- long localId = blockDataProto.getBlockID().getLocalID();
- // Need not wait for create container future here as it has already
- // finished.
- if (block2ChunkMap.get(localId) != null) {
- futureList.addAll(block2ChunkMap.get(localId).getAll());
- }
- CompletableFuture effectiveFuture =
- runCommandAfterFutures(futureList, containerCommandRequestProto);
-
- CompletableFuture putBlockFuture =
- effectiveFuture.thenApply(message -> {
- blockCommitMap.remove(localId);
- return message;
- });
- blockCommitMap.put(localId, putBlockFuture);
- return putBlockFuture;
- }
-
- // Close Container should be executed only if all pending WriteType
- // container cmds get executed. Transactions which can return a future
- // are WriteChunk and PutBlock.
- private CompletableFuture handleCloseContainer(
- ContainerCommandRequestProto requestProto) {
- List> futureList = new ArrayList<>();
-
- // No need to wait for create container future here as it should have
- // already finished.
- block2ChunkMap.values().forEach(b -> futureList.addAll(b.getAll()));
- futureList.addAll(blockCommitMap.values());
-
- // There are pending write Chunk/PutBlock type requests
- // Queue this closeContainer request behind all these requests
- CompletableFuture closeContainerFuture =
- runCommandAfterFutures(futureList, requestProto);
-
- return closeContainerFuture.thenApply(message -> {
- stateMachineMap.remove(requestProto.getContainerID());
- return message;
- });
- }
-
- private CompletableFuture handleChunkCommit(
- ContainerCommandRequestProto requestProto, long index) {
- WriteChunkRequestProto write = requestProto.getWriteChunk();
- // the data field has already been removed in start Transaction
- Preconditions.checkArgument(!write.hasData());
- CompletableFuture stateMachineFuture =
- writeChunkFutureMap.remove(index);
- CompletableFuture commitChunkFuture = stateMachineFuture
- .thenComposeAsync(v -> CompletableFuture
- .completedFuture(runCommand(requestProto)));
-
- long localId = requestProto.getWriteChunk().getBlockID().getLocalID();
- // Put the applyTransaction Future again to the Map.
- // closeContainer should synchronize with this.
- block2ChunkMap
- .computeIfAbsent(localId, id -> new CommitChunkFutureMap())
- .add(index, commitChunkFuture);
- return commitChunkFuture.thenApply(message -> {
- block2ChunkMap.computeIfPresent(localId, (containerId, chunks)
- -> chunks.removeAndGetSize(index) == 0? null: chunks);
- return message;
- });
- }
-
- private CompletableFuture runCommandAfterFutures(
- List> futureList,
- ContainerCommandRequestProto requestProto) {
- CompletableFuture effectiveFuture;
- if (futureList.isEmpty()) {
- effectiveFuture = CompletableFuture
- .supplyAsync(() -> runCommand(requestProto));
-
- } else {
- CompletableFuture allFuture = CompletableFuture.allOf(
- futureList.toArray(new CompletableFuture[futureList.size()]));
- effectiveFuture = allFuture
- .thenApplyAsync(v -> runCommand(requestProto));
- }
- return effectiveFuture;
- }
-
- CompletableFuture handleCreateContainer(
- ContainerCommandRequestProto requestProto) {
- CompletableFuture future =
- CompletableFuture.completedFuture(runCommand(requestProto));
- future.thenAccept(m -> {
- createContainerFuture.complete(m);
- createContainerFuture = null;
- });
- return future;
- }
-
- CompletableFuture handleOtherCommands(
- ContainerCommandRequestProto requestProto) {
- return CompletableFuture.completedFuture(runCommand(requestProto));
- }
-
- CompletableFuture executeContainerCommand(
- ContainerCommandRequestProto requestProto, long index) {
- Type cmdType = requestProto.getCmdType();
- switch (cmdType) {
- case WriteChunk:
- return handleChunkCommit(requestProto, index);
- case CloseContainer:
- return handleCloseContainer(requestProto);
- case PutBlock:
- return handlePutBlock(requestProto, index);
- case CreateContainer:
- return handleCreateContainer(requestProto);
- default:
- return handleOtherCommands(requestProto);
- }
- }
- }
-
- @VisibleForTesting
- public ConcurrentHashMap getStateMachineMap() {
- return stateMachineMap;
- }
-
- @VisibleForTesting
- public CompletableFuture getCreateContainerFuture(long containerId) {
- StateMachineHelper helper = stateMachineMap.get(containerId);
- return helper == null ? null : helper.createContainerFuture;
- }
-
- @VisibleForTesting
- public List> getCommitChunkFutureMap(
- long containerId) {
- StateMachineHelper helper = stateMachineMap.get(containerId);
- if (helper != null) {
- List> futureList = new ArrayList<>();
- stateMachineMap.get(containerId).block2ChunkMap.values()
- .forEach(b -> futureList.addAll(b.getAll()));
- return futureList;
- }
- return null;
- }
-
- @VisibleForTesting
- public Collection> getWriteChunkFutureMap() {
- return writeChunkFutureMap.values();
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index c2ef504c98..9094217ba0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -56,9 +56,9 @@
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.shaded.proto.RaftProtos;
-import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto;
-import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
@@ -97,6 +97,7 @@ private static long nextCallId() {
private final StateContext context;
private final ReplicationLevel replicationLevel;
private long nodeFailureTimeoutMs;
+ private ContainerStateMachine stateMachine;
private XceiverServerRatis(DatanodeDetails dd, int port,
ContainerDispatcher dispatcher, Configuration conf, StateContext context)
@@ -112,12 +113,15 @@ private XceiverServerRatis(DatanodeDetails dd, int port,
100, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1024),
new ThreadPoolExecutor.CallerRunsPolicy());
+ final int numContainerOpExecutors = conf.getInt(
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY,
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT);
this.context = context;
this.replicationLevel =
conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT);
- ContainerStateMachine stateMachine =
- new ContainerStateMachine(dispatcher, chunkExecutor, this);
+ stateMachine = new ContainerStateMachine(dispatcher, chunkExecutor, this,
+ numContainerOpExecutors);
this.server = RaftServer.newBuilder()
.setServerId(RatisHelper.toRaftPeerId(dd))
.setProperties(serverProperties)
@@ -292,6 +296,7 @@ public void start() throws IOException {
public void stop() {
try {
chunkExecutor.shutdown();
+ stateMachine.close();
server.close();
} catch (IOException e) {
throw new RuntimeException(e);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
index 62e328eac9..8bdae0f1fa 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
@@ -35,7 +35,7 @@
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
index 34953633aa..dee0c11b97 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
@@ -21,7 +21,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
index d96fbfacb5..61a303fcdd 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java
@@ -27,7 +27,7 @@
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.ratis.shaded.com.google.protobuf
+import org.apache.ratis.thirdparty.com.google.protobuf
.InvalidProtocolBufferException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
index 3aafb0cb0e..c8a40b2bc5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java
@@ -36,9 +36,9 @@
import org.apache.hadoop.ozone.OzoneConfigKeys;
import com.google.common.base.Preconditions;
-import org.apache.ratis.shaded.io.grpc.ManagedChannel;
-import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
index d8f696f47d..30a251d4e3 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
@@ -30,8 +30,8 @@
import org.apache.hadoop.hdds.protocol.datanode.proto
.IntraDatanodeProtocolServiceGrpc;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
-import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
index fc622b2d19..fc84ae7f68 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
@@ -40,7 +40,7 @@
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index 6580c2c9f1..de666ceadf 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -712,6 +712,11 @@ long getBlockCommitSequenceId() throws IOException {
if (this.outputStream instanceof ChunkOutputStream) {
ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
return out.getBlockCommitSequenceId();
+ } else if (outputStream == null) {
+ // For a pre allocated block for which no write has been initiated,
+ // the OutputStream will be null here.
+ // In such cases, the default blockCommitSequenceId will be 0
+ return 0;
}
throw new IOException("Invalid Output Stream for Key: " + key);
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
index f606263e61..da8d334e5b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -341,7 +341,6 @@ public void testDiscardPreallocatedBlocks() throws Exception {
Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
// With the initial size provided, it should have pre allocated 4 blocks
Assert.assertEquals(2, groupOutputStream.getStreamEntries().size());
- Assert.assertEquals(2, groupOutputStream.getLocationInfoList().size());
String dataString = fixedLengthString(keyString, (1 * blockSize));
byte[] data = dataString.getBytes();
key.write(data);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index f27847941e..324187c3a2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -25,7 +25,7 @@
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -439,6 +439,7 @@ public static ContainerCommandRequestProto getPutBlockRequest(
List newList = new LinkedList<>();
newList.add(writeRequest.getChunkData());
blockData.setChunks(newList);
+ blockData.setBlockCommitSequenceId(0);
putRequest.setBlockData(blockData.getProtoBufMessage());
ContainerCommandRequestProto.Builder request =
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java
index 78bf008bb9..92bad270f0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java
@@ -32,7 +32,7 @@
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
index de55d9eb27..e6ebbf1b29 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
@@ -24,7 +24,6 @@
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
-import org.apache.ratis.shaded.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java
deleted file mode 100644
index c875a7e99b..0000000000
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.server;
-
-
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerType;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.interfaces.Handler;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis
- .ContainerStateMachine;
-import org.apache.ratis.RatisHelper;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.shaded.proto.RaftProtos;
-import org.apache.ratis.statemachine.TransactionContext;
-import org.apache.ratis.util.ProtoUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Random;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * This class tests ContainerStateMachine.
- */
-public class TestContainerStateMachine {
-
- private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
-
- private static long nextCallId() {
- return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
- }
-
- private ThreadPoolExecutor executor =
- new ThreadPoolExecutor(4, 4, 100, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(1024),
- new ThreadPoolExecutor.CallerRunsPolicy());
- private ContainerStateMachine stateMachine =
- new ContainerStateMachine(new TestContainerDispatcher(), executor, null);
-
-
- @Test
- public void testCloseContainerSynchronization() throws Exception {
- Pipeline pipeline = ContainerTestHelper.createPipeline(3);
- long containerId = new Random().nextLong();
-
- //create container request
- RaftClientRequest createContainer = getRaftClientRequest(
- ContainerTestHelper.getCreateContainerRequest(containerId, pipeline));
-
- ContainerCommandRequestProto writeChunkProto = ContainerTestHelper
- .getWriteChunkRequest(pipeline, new BlockID(containerId, nextCallId()),
- 1024);
-
- RaftClientRequest writeChunkRequest = getRaftClientRequest(writeChunkProto);
-
- // add putKey request
- ContainerCommandRequestProto putKeyProto = ContainerTestHelper
- .getPutBlockRequest(pipeline, writeChunkProto.getWriteChunk());
- RaftClientRequest putKeyRequest = getRaftClientRequest(putKeyProto);
-
- TransactionContext createContainerCtxt =
- startAndWriteStateMachineData(createContainer);
- // Start and Write into the StateMachine
- TransactionContext writeChunkcontext =
- startAndWriteStateMachineData(writeChunkRequest);
-
- TransactionContext putKeyContext =
- stateMachine.startTransaction(putKeyRequest);
- Assert.assertEquals(1, stateMachine.getStateMachineMap().size());
- Assert.assertNotNull(stateMachine.getCreateContainerFuture(containerId));
- Assert.assertEquals(1,
- stateMachine.getWriteChunkFutureMap().size());
- Assert.assertTrue(
- stateMachine.getCommitChunkFutureMap(containerId).isEmpty());
-
- //Add a closeContainerRequest
- RaftClientRequest closeRequest = getRaftClientRequest(
- ContainerTestHelper.getCloseContainer(pipeline, containerId));
-
- TransactionContext closeCtx = stateMachine.startTransaction(closeRequest);
-
- // Now apply all the transaction for the CreateContainer Command.
- // This will unblock writeChunks as well
-
- stateMachine.applyTransaction(createContainerCtxt);
- stateMachine.applyTransaction(writeChunkcontext);
- CompletableFuture putKeyFuture =
- stateMachine.applyTransaction(putKeyContext);
- waitForTransactionCompletion(putKeyFuture);
- // Make sure the putKey transaction complete
- Assert.assertTrue(putKeyFuture.isDone());
-
- // Execute the closeContainer. This should ensure all prior Write Type
- // container requests finish execution
-
- CompletableFuture closeFuture =
- stateMachine.applyTransaction(closeCtx);
- waitForTransactionCompletion(closeFuture);
- // Make sure the closeContainer transaction complete
- Assert.assertTrue(closeFuture.isDone());
- Assert.assertNull(stateMachine.getCreateContainerFuture(containerId));
- Assert.assertNull(stateMachine.getCommitChunkFutureMap(containerId));
-
- }
-
- private RaftClientRequest getRaftClientRequest(
- ContainerCommandRequestProto req) throws IOException {
- ClientId clientId = ClientId.randomId();
- return new RaftClientRequest(clientId,
- RatisHelper.toRaftPeerId(ContainerTestHelper.createDatanodeDetails()),
- RatisHelper.emptyRaftGroup().getGroupId(), nextCallId(), 0,
- Message.valueOf(req.toByteString()), RaftClientRequest
- .writeRequestType(RaftProtos.ReplicationLevel.MAJORITY));
- }
-
- private void waitForTransactionCompletion(
- CompletableFuture future) throws Exception {
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- executorService
- .invokeAll(Collections.singleton(future::get), 10,
- TimeUnit.SECONDS); // Timeout of 10 minutes.
- executorService.shutdown();
- }
-
- private TransactionContext startAndWriteStateMachineData(
- RaftClientRequest request) throws IOException {
- TransactionContext ctx = stateMachine.startTransaction(request);
- RaftProtos.LogEntryProto e = ProtoUtils
- .toLogEntryProto(ctx.getSMLogEntry(), request.getSeqNum(),
- request.getCallId(), ClientId.randomId(), request.getCallId());
- ctx.setLogEntry(e);
- stateMachine.writeStateMachineData(e);
- return ctx;
- }
-
- // ContainerDispatcher for test only purpose.
- private static class TestContainerDispatcher implements ContainerDispatcher {
- /**
- * Dispatches commands to container layer.
- *
- * @param msg - Command Request
- * @return Command Response
- */
- @Override
- public ContainerCommandResponseProto dispatch(
- ContainerCommandRequestProto msg) {
- return ContainerTestHelper.getCreateContainerResponse(msg);
- }
-
- @Override
- public void init() {
- }
-
- @Override
- public void shutdown() {
- }
-
- @Override
- public void setScmId(String scmId) {
- }
-
- @Override
- public Handler getHandler(ContainerType containerType) {
- return null;
- }
- }
-}
\ No newline at end of file
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
index 8811d910fe..362e53f265 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java
@@ -23,7 +23,7 @@
.DatanodeStateMachine.DatanodeStates;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 5f88371851..7472a5402d 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -103,7 +103,7 @@
1.0.0-M33
- 0.3.0-eca3531-SNAPSHOT
+ 0.3.0-9b84d79-SNAPSHOT
1.0-alpha-1
3.3.1
2.4.12