From 94c0346f356096cd4d150655a70cdd357f50bfd7 Mon Sep 17 00:00:00 2001 From: Mukul Kumar Singh Date: Tue, 23 Jan 2018 11:19:46 +0530 Subject: [PATCH] HDFS-13024. Ozone: ContainerStateMachine should synchronize operations between createContainer and writeChunk. Contributed by Mukul Kumar Singh. --- .../apache/hadoop/ozone/OzoneConfigKeys.java | 4 + .../org/apache/hadoop/scm/ScmConfigKeys.java | 6 +- .../server/ratis/ContainerStateMachine.java | 115 ++++++++++++++---- .../server/ratis/XceiverServerRatis.java | 34 ++++-- .../src/main/resources/ozone-default.xml | 12 +- 5 files changed, 134 insertions(+), 37 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 8059b5eee9..22a4787d71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -229,6 +229,10 @@ public final class OzoneConfigKeys { = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY; public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT; + public static final String DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY + = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY; + public static final int DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT + = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT; public static final int DFS_CONTAINER_CHUNK_MAX_SIZE = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE; public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR = diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java index b41db77492..6f5c8737c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java @@ -57,7 +57,11 @@ public final class ScmConfigKeys { public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY = "dfs.container.ratis.segment.size"; public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT = - 128 * 1024 * 1024; + 1 * 1024 * 1024 * 1024; + public static final String DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY = + "dfs.container.ratis.segment.preallocated.size"; + public static final int + DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT = 128 * 1024 * 1024; // TODO : this is copied from OzoneConsts, may need to move to a better place public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index a4517b3b98..c96cc5d0a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; +import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; @@ -45,29 +46,61 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.ArrayBlockingQueue; -/** A {@link org.apache.ratis.statemachine.StateMachine} for containers. */ +/** A {@link org.apache.ratis.statemachine.StateMachine} for containers. + * + * The stateMachine is responsible for handling different types of container + * requests. The container requests can be divided into readonly and write + * requests. + * + * Read only requests are classified in + * {@link org.apache.hadoop.scm.XceiverClientRatis#isReadOnly} + * and these readonly requests are replied from the + * {@link #query(RaftClientRequest)} + * + * The write requests can be divided into requests with user data + * (WriteChunkRequest) and other request without user data. + * + * Inorder to optimize the write throughput, the writeChunk request is + * processed in 2 phases. The 2 phases are divided in + * {@link #startTransaction(RaftClientRequest)}, in the first phase the user + * data is written directly into the state machine via + * {@link #writeStateMachineData} and in the second phase the + * transaction is committed via {@link #applyTransaction(TransactionContext)} + * + * For the requests with no stateMachine data, the transaction is directly + * committed through + * {@link #applyTransaction(TransactionContext)} + * + * There are 2 ordering operation which are enforced right now in the code, + * 1) Write chunk operation are executed after the create container operation, + * the write chunk operation will fail otherwise as the container still hasn't + * been created. Hence the create container operation has been split in the + * {@link #startTransaction(RaftClientRequest)}, this will help in synchronizing + * the calls in {@link #writeStateMachineData} + * + * 2) Write chunk commit operation is executed after write chunk state machine + * operation. This will ensure that commit operation is sync'd with the state + * machine operation. + * */ public class ContainerStateMachine extends BaseStateMachine { static final Logger LOG = LoggerFactory.getLogger( ContainerStateMachine.class); private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final ContainerDispatcher dispatcher; - private final ThreadPoolExecutor writeChunkExecutor; + private ThreadPoolExecutor writeChunkExecutor; private final ConcurrentHashMap> - writeChunkMap; + writeChunkFutureMap; + private final ConcurrentHashMap> + createContainerFutureMap; ContainerStateMachine(ContainerDispatcher dispatcher, - int numWriteChunkThreads) { + ThreadPoolExecutor writeChunkExecutor) { this.dispatcher = dispatcher; - writeChunkMap = new ConcurrentHashMap<>(); - writeChunkExecutor = - new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads, - 60, TimeUnit.SECONDS, - new ArrayBlockingQueue<>(1024), - new ThreadPoolExecutor.CallerRunsPolicy()); + this.writeChunkExecutor = writeChunkExecutor; + this.writeChunkFutureMap = new ConcurrentHashMap<>(); + this.createContainerFutureMap = new ConcurrentHashMap<>(); } @Override @@ -81,13 +114,13 @@ public void initialize( throws IOException { super.initialize(id, properties, raftStorage); storage.init(raftStorage); - writeChunkExecutor.prestartAllCoreThreads(); // TODO handle snapshots // TODO: Add a flag that tells you that initialize has been called. // Check with Ratis if this feature is done in Ratis. } + @Override public TransactionContext startTransaction(RaftClientRequest request) throws IOException { final ContainerCommandRequestProto proto = @@ -110,8 +143,12 @@ public TransactionContext startTransaction(RaftClientRequest request) // create the log entry proto final WriteChunkRequestProto commitWriteChunkProto = - WriteChunkRequestProto - .newBuilder(write) + WriteChunkRequestProto.newBuilder() + .setPipeline(write.getPipeline()) + .setKeyName(write.getKeyName()) + .setChunkData(write.getChunkData()) + // skipping the data field as it is + // already set in statemachine data proto .setStage(ContainerProtos.Stage.COMMIT_DATA) .build(); ContainerCommandRequestProto commitContainerCommandProto = @@ -124,6 +161,11 @@ public TransactionContext startTransaction(RaftClientRequest request) .setData(getShadedByteString(commitContainerCommandProto)) .setStateMachineData(getShadedByteString(dataContainerCommandProto)) .build(); + } else if (proto.getCmdType() == ContainerProtos.Type.CreateContainer) { + log = SMLogEntryProto.newBuilder() + .setData(request.getMessage().getContent()) + .setStateMachineData(request.getMessage().getContent()) + .build(); } else { log = SMLogEntryProto.newBuilder() .setData(request.getMessage().getContent()) @@ -154,12 +196,30 @@ public CompletableFuture writeStateMachineData(LogEntryProto entry) { try { final ContainerCommandRequestProto requestProto = getRequestProto(entry.getSmLogEntry().getStateMachineData()); - final WriteChunkRequestProto write = requestProto.getWriteChunk(); - Message raftClientReply = runCommand(requestProto); - CompletableFuture future = - CompletableFuture.completedFuture(raftClientReply); - writeChunkMap.put(write.getChunkData().getChunkName(),future); - return future; + if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) { + String containerName = + requestProto.getCreateContainer().getContainerData().getName(); + createContainerFutureMap. + computeIfAbsent(containerName, k -> new CompletableFuture<>()); + return CompletableFuture.completedFuture(() -> ByteString.EMPTY); + } else { + final WriteChunkRequestProto write = requestProto.getWriteChunk(); + String containerName = write.getPipeline().getContainerName(); + CompletableFuture future = + createContainerFutureMap.get(containerName); + + CompletableFuture writeChunkFuture; + if (future != null) { + writeChunkFuture = future.thenApplyAsync( + v -> runCommand(requestProto), writeChunkExecutor); + } else { + writeChunkFuture = CompletableFuture.supplyAsync( + () -> runCommand(requestProto), writeChunkExecutor); + } + writeChunkFutureMap + .put(write.getChunkData().getChunkName(), writeChunkFuture); + return writeChunkFuture; + } } catch (IOException e) { return completeExceptionally(e); } @@ -186,13 +246,21 @@ public CompletableFuture applyTransaction(TransactionContext trx) { if (requestProto.getCmdType() == ContainerProtos.Type.WriteChunk) { WriteChunkRequestProto write = requestProto.getWriteChunk(); + // the data field has already been removed in start Transaction + Preconditions.checkArgument(!write.hasData()); CompletableFuture stateMachineFuture = - writeChunkMap.remove(write.getChunkData().getChunkName()); + writeChunkFutureMap.remove(write.getChunkData().getChunkName()); return stateMachineFuture .thenComposeAsync(v -> CompletableFuture.completedFuture(runCommand(requestProto))); } else { - return CompletableFuture.completedFuture(runCommand(requestProto)); + Message message = runCommand(requestProto); + if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) { + String containerName = + requestProto.getCreateContainer().getContainerData().getName(); + createContainerFutureMap.remove(containerName).complete(message); + } + return CompletableFuture.completedFuture(message); } } catch (IOException e) { return completeExceptionally(e); @@ -207,6 +275,5 @@ private static CompletableFuture completeExceptionally(Exception e) { @Override public void close() throws IOException { - writeChunkExecutor.shutdown(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 7baca257e5..ff52341ed7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -28,7 +28,6 @@ .XceiverServerSpi; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; -import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; @@ -48,6 +47,9 @@ import java.net.ServerSocket; import java.net.SocketAddress; import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Creates a ratis server endpoint that acts as the communication layer for @@ -57,6 +59,7 @@ public final class XceiverServerRatis implements XceiverServerSpi { static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class); private final int port; private final RaftServer server; + private ThreadPoolExecutor writeChunkExecutor; private XceiverServerRatis(DatanodeID id, int port, String storageDir, ContainerDispatcher dispatcher, Configuration conf) throws IOException { @@ -68,6 +71,9 @@ private XceiverServerRatis(DatanodeID id, int port, String storageDir, final int raftSegmentSize = conf.getInt( OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT); + final int raftSegmentPreallocatedSize = conf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT); final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE; final int numWriteChunkThreads = conf.getInt( OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY, @@ -76,28 +82,34 @@ private XceiverServerRatis(DatanodeID id, int port, String storageDir, Objects.requireNonNull(id, "id == null"); this.port = port; RaftProperties serverProperties = newRaftProperties(rpc, port, - storageDir, maxChunkSize, raftSegmentSize); + storageDir, maxChunkSize, raftSegmentSize, raftSegmentPreallocatedSize); + writeChunkExecutor = + new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads, + 100, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(1024), + new ThreadPoolExecutor.CallerRunsPolicy()); + ContainerStateMachine stateMachine = + new ContainerStateMachine(dispatcher, writeChunkExecutor); this.server = RaftServer.newBuilder() .setServerId(RatisHelper.toRaftPeerId(id)) .setGroup(RatisHelper.emptyRaftGroup()) .setProperties(serverProperties) - .setStateMachine(new ContainerStateMachine(dispatcher, - numWriteChunkThreads)) + .setStateMachine(stateMachine) .build(); } private static RaftProperties newRaftProperties( RpcType rpc, int port, String storageDir, int scmChunkSize, - int raftSegmentSize) { + int raftSegmentSize, int raftSegmentPreallocatedSize) { final RaftProperties properties = new RaftProperties(); RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true); RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties, - SizeInBytes.valueOf(raftSegmentSize)); + SizeInBytes.valueOf(raftSegmentPreallocatedSize)); RaftServerConfigKeys.Log.setWriteBufferSize(properties, SizeInBytes.valueOf(scmChunkSize)); RaftServerConfigKeys.Log.setPreallocatedSize(properties, - SizeInBytes.valueOf(raftSegmentSize)); + SizeInBytes.valueOf(raftSegmentPreallocatedSize)); RaftServerConfigKeys.Log.setSegmentSizeMax(properties, SizeInBytes.valueOf(raftSegmentSize)); RaftServerConfigKeys.setStorageDir(properties, new File(storageDir)); @@ -106,9 +118,9 @@ private static RaftProperties newRaftProperties( //TODO: change these configs to setter after RATIS-154 properties.setInt("raft.server.log.segment.cache.num.max", 2); properties.setInt("raft.grpc.message.size.max", - scmChunkSize + raftSegmentSize); - properties.setInt("raft.server.rpc.timeout.min", 500); - properties.setInt("raft.server.rpc.timeout.max", 600); + scmChunkSize + raftSegmentPreallocatedSize); + properties.setInt("raft.server.rpc.timeout.min", 800); + properties.setInt("raft.server.rpc.timeout.max", 1000); if (rpc == SupportedRpcType.GRPC) { GrpcConfigKeys.Server.setPort(properties, port); } else { @@ -171,12 +183,14 @@ public static XceiverServerRatis newXceiverServerRatis(DatanodeID datanodeID, public void start() throws IOException { LOG.info("Starting {} {} at port {}", getClass().getSimpleName(), server.getId(), getIPCPort()); + writeChunkExecutor.prestartAllCoreThreads(); server.start(); } @Override public void stop() { try { + writeChunkExecutor.shutdown(); server.close(); } catch (IOException e) { throw new RuntimeException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index e1da595801..434f5c7c76 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -356,10 +356,18 @@ dfs.container.ratis.segment.size - 134217728 + 1073741824 OZONE, RATIS, PERFORMANCE The size of the raft segment used by Apache Ratis on datanodes. - (128 MB by default) + (1 GB by default) + + + + dfs.container.ratis.segment.preallocated.size + 134217728 + OZONE, RATIS, PERFORMANCE + The size of the buffer which is preallocated for raft segment + used by Apache Ratis on datanodes.(128 MB by default)