HDFS-13024. Ozone: ContainerStateMachine should synchronize operations between createContainer and writeChunk. Contributed by Mukul Kumar Singh.

This commit is contained in:
Mukul Kumar Singh 2018-01-23 11:19:46 +05:30 committed by Owen O'Malley
parent 4a051ba494
commit 94c0346f35
5 changed files with 134 additions and 37 deletions

View File

@ -229,6 +229,10 @@ public final class OzoneConfigKeys {
= ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY; = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY;
public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT
= ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT; = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT;
public static final String DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY
= ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY;
public static final int DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT
= ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT;
public static final int DFS_CONTAINER_CHUNK_MAX_SIZE public static final int DFS_CONTAINER_CHUNK_MAX_SIZE
= ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE; = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR = public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR =

View File

@ -57,7 +57,11 @@ public final class ScmConfigKeys {
public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY = public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY =
"dfs.container.ratis.segment.size"; "dfs.container.ratis.segment.size";
public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT = public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT =
128 * 1024 * 1024; 1 * 1024 * 1024 * 1024;
public static final String DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY =
"dfs.container.ratis.segment.preallocated.size";
public static final int
DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT = 128 * 1024 * 1024;
// TODO : this is copied from OzoneConsts, may need to move to a better place // TODO : this is copied from OzoneConsts, may need to move to a better place
public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size"; public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size";

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis; package org.apache.hadoop.ozone.container.common.transport.server.ratis;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
@ -45,29 +46,61 @@
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ArrayBlockingQueue;
/** A {@link org.apache.ratis.statemachine.StateMachine} for containers. */ /** A {@link org.apache.ratis.statemachine.StateMachine} for containers.
*
* The stateMachine is responsible for handling different types of container
* requests. The container requests can be divided into readonly and write
* requests.
*
* Read only requests are classified in
* {@link org.apache.hadoop.scm.XceiverClientRatis#isReadOnly}
* and these readonly requests are replied from the
* {@link #query(RaftClientRequest)}
*
* The write requests can be divided into requests with user data
* (WriteChunkRequest) and other request without user data.
*
* Inorder to optimize the write throughput, the writeChunk request is
* processed in 2 phases. The 2 phases are divided in
* {@link #startTransaction(RaftClientRequest)}, in the first phase the user
* data is written directly into the state machine via
* {@link #writeStateMachineData} and in the second phase the
* transaction is committed via {@link #applyTransaction(TransactionContext)}
*
* For the requests with no stateMachine data, the transaction is directly
* committed through
* {@link #applyTransaction(TransactionContext)}
*
* There are 2 ordering operation which are enforced right now in the code,
* 1) Write chunk operation are executed after the create container operation,
* the write chunk operation will fail otherwise as the container still hasn't
* been created. Hence the create container operation has been split in the
* {@link #startTransaction(RaftClientRequest)}, this will help in synchronizing
* the calls in {@link #writeStateMachineData}
*
* 2) Write chunk commit operation is executed after write chunk state machine
* operation. This will ensure that commit operation is sync'd with the state
* machine operation.
* */
public class ContainerStateMachine extends BaseStateMachine { public class ContainerStateMachine extends BaseStateMachine {
static final Logger LOG = LoggerFactory.getLogger( static final Logger LOG = LoggerFactory.getLogger(
ContainerStateMachine.class); ContainerStateMachine.class);
private final SimpleStateMachineStorage storage private final SimpleStateMachineStorage storage
= new SimpleStateMachineStorage(); = new SimpleStateMachineStorage();
private final ContainerDispatcher dispatcher; private final ContainerDispatcher dispatcher;
private final ThreadPoolExecutor writeChunkExecutor; private ThreadPoolExecutor writeChunkExecutor;
private final ConcurrentHashMap<String, CompletableFuture<Message>> private final ConcurrentHashMap<String, CompletableFuture<Message>>
writeChunkMap; writeChunkFutureMap;
private final ConcurrentHashMap<String, CompletableFuture<Message>>
createContainerFutureMap;
ContainerStateMachine(ContainerDispatcher dispatcher, ContainerStateMachine(ContainerDispatcher dispatcher,
int numWriteChunkThreads) { ThreadPoolExecutor writeChunkExecutor) {
this.dispatcher = dispatcher; this.dispatcher = dispatcher;
writeChunkMap = new ConcurrentHashMap<>(); this.writeChunkExecutor = writeChunkExecutor;
writeChunkExecutor = this.writeChunkFutureMap = new ConcurrentHashMap<>();
new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads, this.createContainerFutureMap = new ConcurrentHashMap<>();
60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1024),
new ThreadPoolExecutor.CallerRunsPolicy());
} }
@Override @Override
@ -81,13 +114,13 @@ public void initialize(
throws IOException { throws IOException {
super.initialize(id, properties, raftStorage); super.initialize(id, properties, raftStorage);
storage.init(raftStorage); storage.init(raftStorage);
writeChunkExecutor.prestartAllCoreThreads();
// TODO handle snapshots // TODO handle snapshots
// TODO: Add a flag that tells you that initialize has been called. // TODO: Add a flag that tells you that initialize has been called.
// Check with Ratis if this feature is done in Ratis. // Check with Ratis if this feature is done in Ratis.
} }
@Override
public TransactionContext startTransaction(RaftClientRequest request) public TransactionContext startTransaction(RaftClientRequest request)
throws IOException { throws IOException {
final ContainerCommandRequestProto proto = final ContainerCommandRequestProto proto =
@ -110,8 +143,12 @@ public TransactionContext startTransaction(RaftClientRequest request)
// create the log entry proto // create the log entry proto
final WriteChunkRequestProto commitWriteChunkProto = final WriteChunkRequestProto commitWriteChunkProto =
WriteChunkRequestProto WriteChunkRequestProto.newBuilder()
.newBuilder(write) .setPipeline(write.getPipeline())
.setKeyName(write.getKeyName())
.setChunkData(write.getChunkData())
// skipping the data field as it is
// already set in statemachine data proto
.setStage(ContainerProtos.Stage.COMMIT_DATA) .setStage(ContainerProtos.Stage.COMMIT_DATA)
.build(); .build();
ContainerCommandRequestProto commitContainerCommandProto = ContainerCommandRequestProto commitContainerCommandProto =
@ -124,6 +161,11 @@ public TransactionContext startTransaction(RaftClientRequest request)
.setData(getShadedByteString(commitContainerCommandProto)) .setData(getShadedByteString(commitContainerCommandProto))
.setStateMachineData(getShadedByteString(dataContainerCommandProto)) .setStateMachineData(getShadedByteString(dataContainerCommandProto))
.build(); .build();
} else if (proto.getCmdType() == ContainerProtos.Type.CreateContainer) {
log = SMLogEntryProto.newBuilder()
.setData(request.getMessage().getContent())
.setStateMachineData(request.getMessage().getContent())
.build();
} else { } else {
log = SMLogEntryProto.newBuilder() log = SMLogEntryProto.newBuilder()
.setData(request.getMessage().getContent()) .setData(request.getMessage().getContent())
@ -154,12 +196,30 @@ public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
try { try {
final ContainerCommandRequestProto requestProto = final ContainerCommandRequestProto requestProto =
getRequestProto(entry.getSmLogEntry().getStateMachineData()); getRequestProto(entry.getSmLogEntry().getStateMachineData());
final WriteChunkRequestProto write = requestProto.getWriteChunk(); if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) {
Message raftClientReply = runCommand(requestProto); String containerName =
CompletableFuture<Message> future = requestProto.getCreateContainer().getContainerData().getName();
CompletableFuture.completedFuture(raftClientReply); createContainerFutureMap.
writeChunkMap.put(write.getChunkData().getChunkName(),future); computeIfAbsent(containerName, k -> new CompletableFuture<>());
return future; return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
} else {
final WriteChunkRequestProto write = requestProto.getWriteChunk();
String containerName = write.getPipeline().getContainerName();
CompletableFuture<Message> future =
createContainerFutureMap.get(containerName);
CompletableFuture<Message> writeChunkFuture;
if (future != null) {
writeChunkFuture = future.thenApplyAsync(
v -> runCommand(requestProto), writeChunkExecutor);
} else {
writeChunkFuture = CompletableFuture.supplyAsync(
() -> runCommand(requestProto), writeChunkExecutor);
}
writeChunkFutureMap
.put(write.getChunkData().getChunkName(), writeChunkFuture);
return writeChunkFuture;
}
} catch (IOException e) { } catch (IOException e) {
return completeExceptionally(e); return completeExceptionally(e);
} }
@ -186,13 +246,21 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
if (requestProto.getCmdType() == ContainerProtos.Type.WriteChunk) { if (requestProto.getCmdType() == ContainerProtos.Type.WriteChunk) {
WriteChunkRequestProto write = requestProto.getWriteChunk(); WriteChunkRequestProto write = requestProto.getWriteChunk();
// the data field has already been removed in start Transaction
Preconditions.checkArgument(!write.hasData());
CompletableFuture<Message> stateMachineFuture = CompletableFuture<Message> stateMachineFuture =
writeChunkMap.remove(write.getChunkData().getChunkName()); writeChunkFutureMap.remove(write.getChunkData().getChunkName());
return stateMachineFuture return stateMachineFuture
.thenComposeAsync(v -> .thenComposeAsync(v ->
CompletableFuture.completedFuture(runCommand(requestProto))); CompletableFuture.completedFuture(runCommand(requestProto)));
} else { } else {
return CompletableFuture.completedFuture(runCommand(requestProto)); Message message = runCommand(requestProto);
if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) {
String containerName =
requestProto.getCreateContainer().getContainerData().getName();
createContainerFutureMap.remove(containerName).complete(message);
}
return CompletableFuture.completedFuture(message);
} }
} catch (IOException e) { } catch (IOException e) {
return completeExceptionally(e); return completeExceptionally(e);
@ -207,6 +275,5 @@ private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
writeChunkExecutor.shutdown();
} }
} }

View File

@ -28,7 +28,6 @@
.XceiverServerSpi; .XceiverServerSpi;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcConfigKeys;
@ -48,6 +47,9 @@
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** /**
* Creates a ratis server endpoint that acts as the communication layer for * Creates a ratis server endpoint that acts as the communication layer for
@ -57,6 +59,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class); static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
private final int port; private final int port;
private final RaftServer server; private final RaftServer server;
private ThreadPoolExecutor writeChunkExecutor;
private XceiverServerRatis(DatanodeID id, int port, String storageDir, private XceiverServerRatis(DatanodeID id, int port, String storageDir,
ContainerDispatcher dispatcher, Configuration conf) throws IOException { ContainerDispatcher dispatcher, Configuration conf) throws IOException {
@ -68,6 +71,9 @@ private XceiverServerRatis(DatanodeID id, int port, String storageDir,
final int raftSegmentSize = conf.getInt( final int raftSegmentSize = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT); OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT);
final int raftSegmentPreallocatedSize = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT);
final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE; final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE;
final int numWriteChunkThreads = conf.getInt( final int numWriteChunkThreads = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
@ -76,28 +82,34 @@ private XceiverServerRatis(DatanodeID id, int port, String storageDir,
Objects.requireNonNull(id, "id == null"); Objects.requireNonNull(id, "id == null");
this.port = port; this.port = port;
RaftProperties serverProperties = newRaftProperties(rpc, port, RaftProperties serverProperties = newRaftProperties(rpc, port,
storageDir, maxChunkSize, raftSegmentSize); storageDir, maxChunkSize, raftSegmentSize, raftSegmentPreallocatedSize);
writeChunkExecutor =
new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
100, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1024),
new ThreadPoolExecutor.CallerRunsPolicy());
ContainerStateMachine stateMachine =
new ContainerStateMachine(dispatcher, writeChunkExecutor);
this.server = RaftServer.newBuilder() this.server = RaftServer.newBuilder()
.setServerId(RatisHelper.toRaftPeerId(id)) .setServerId(RatisHelper.toRaftPeerId(id))
.setGroup(RatisHelper.emptyRaftGroup()) .setGroup(RatisHelper.emptyRaftGroup())
.setProperties(serverProperties) .setProperties(serverProperties)
.setStateMachine(new ContainerStateMachine(dispatcher, .setStateMachine(stateMachine)
numWriteChunkThreads))
.build(); .build();
} }
private static RaftProperties newRaftProperties( private static RaftProperties newRaftProperties(
RpcType rpc, int port, String storageDir, int scmChunkSize, RpcType rpc, int port, String storageDir, int scmChunkSize,
int raftSegmentSize) { int raftSegmentSize, int raftSegmentPreallocatedSize) {
final RaftProperties properties = new RaftProperties(); final RaftProperties properties = new RaftProperties();
RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true); RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true);
RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties, RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
SizeInBytes.valueOf(raftSegmentSize)); SizeInBytes.valueOf(raftSegmentPreallocatedSize));
RaftServerConfigKeys.Log.setWriteBufferSize(properties, RaftServerConfigKeys.Log.setWriteBufferSize(properties,
SizeInBytes.valueOf(scmChunkSize)); SizeInBytes.valueOf(scmChunkSize));
RaftServerConfigKeys.Log.setPreallocatedSize(properties, RaftServerConfigKeys.Log.setPreallocatedSize(properties,
SizeInBytes.valueOf(raftSegmentSize)); SizeInBytes.valueOf(raftSegmentPreallocatedSize));
RaftServerConfigKeys.Log.setSegmentSizeMax(properties, RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
SizeInBytes.valueOf(raftSegmentSize)); SizeInBytes.valueOf(raftSegmentSize));
RaftServerConfigKeys.setStorageDir(properties, new File(storageDir)); RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
@ -106,9 +118,9 @@ private static RaftProperties newRaftProperties(
//TODO: change these configs to setter after RATIS-154 //TODO: change these configs to setter after RATIS-154
properties.setInt("raft.server.log.segment.cache.num.max", 2); properties.setInt("raft.server.log.segment.cache.num.max", 2);
properties.setInt("raft.grpc.message.size.max", properties.setInt("raft.grpc.message.size.max",
scmChunkSize + raftSegmentSize); scmChunkSize + raftSegmentPreallocatedSize);
properties.setInt("raft.server.rpc.timeout.min", 500); properties.setInt("raft.server.rpc.timeout.min", 800);
properties.setInt("raft.server.rpc.timeout.max", 600); properties.setInt("raft.server.rpc.timeout.max", 1000);
if (rpc == SupportedRpcType.GRPC) { if (rpc == SupportedRpcType.GRPC) {
GrpcConfigKeys.Server.setPort(properties, port); GrpcConfigKeys.Server.setPort(properties, port);
} else { } else {
@ -171,12 +183,14 @@ public static XceiverServerRatis newXceiverServerRatis(DatanodeID datanodeID,
public void start() throws IOException { public void start() throws IOException {
LOG.info("Starting {} {} at port {}", getClass().getSimpleName(), LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
server.getId(), getIPCPort()); server.getId(), getIPCPort());
writeChunkExecutor.prestartAllCoreThreads();
server.start(); server.start();
} }
@Override @Override
public void stop() { public void stop() {
try { try {
writeChunkExecutor.shutdown();
server.close(); server.close();
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -356,10 +356,18 @@
</property> </property>
<property> <property>
<name>dfs.container.ratis.segment.size</name> <name>dfs.container.ratis.segment.size</name>
<value>134217728</value> <value>1073741824</value>
<tag>OZONE, RATIS, PERFORMANCE</tag> <tag>OZONE, RATIS, PERFORMANCE</tag>
<description>The size of the raft segment used by Apache Ratis on datanodes. <description>The size of the raft segment used by Apache Ratis on datanodes.
(128 MB by default) (1 GB by default)
</description>
</property>
<property>
<name>dfs.container.ratis.segment.preallocated.size</name>
<value>134217728</value>
<tag>OZONE, RATIS, PERFORMANCE</tag>
<description>The size of the buffer which is preallocated for raft segment
used by Apache Ratis on datanodes.(128 MB by default)
</description> </description>
</property> </property>
<property> <property>