HDDS-230. ContainerStateMachine should implement readStateMachineData api to read data from Containers if required during replication. Contributed by Mukul Kumar Singh.

This commit is contained in:
Mukul Kumar Singh 2018-08-07 15:03:14 +05:30
parent 2e4e02b4df
commit 900c0e114f
4 changed files with 129 additions and 27 deletions

View File

@ -19,20 +19,26 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.shaded.com.google.protobuf
.InvalidProtocolBufferException;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.WriteChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadChunkRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ReadChunkResponseProto;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
@ -96,16 +102,16 @@ public class ContainerStateMachine extends BaseStateMachine {
private final SimpleStateMachineStorage storage
= new SimpleStateMachineStorage();
private final ContainerDispatcher dispatcher;
private ThreadPoolExecutor writeChunkExecutor;
private ThreadPoolExecutor chunkExecutor;
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
writeChunkFutureMap;
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
createContainerFutureMap;
ContainerStateMachine(ContainerDispatcher dispatcher,
ThreadPoolExecutor writeChunkExecutor) {
ThreadPoolExecutor chunkExecutor) {
this.dispatcher = dispatcher;
this.writeChunkExecutor = writeChunkExecutor;
this.chunkExecutor = chunkExecutor;
this.writeChunkFutureMap = new ConcurrentHashMap<>();
this.createContainerFutureMap = new ConcurrentHashMap<>();
}
@ -117,9 +123,9 @@ public StateMachineStorage getStateMachineStorage() {
@Override
public void initialize(
RaftPeerId id, RaftProperties properties, RaftStorage raftStorage)
RaftServer server, RaftGroupId id, RaftStorage raftStorage)
throws IOException {
super.initialize(id, properties, raftStorage);
super.initialize(server, id, raftStorage);
storage.init(raftStorage);
// TODO handle snapshots
@ -134,13 +140,13 @@ public TransactionContext startTransaction(RaftClientRequest request)
getRequestProto(request.getMessage().getContent());
final SMLogEntryProto log;
if (proto.getCmdType() == ContainerProtos.Type.WriteChunk) {
if (proto.getCmdType() == Type.WriteChunk) {
final WriteChunkRequestProto write = proto.getWriteChunk();
// create the state machine data proto
final WriteChunkRequestProto dataWriteChunkProto =
WriteChunkRequestProto
.newBuilder(write)
.setStage(ContainerProtos.Stage.WRITE_DATA)
.setStage(Stage.WRITE_DATA)
.build();
ContainerCommandRequestProto dataContainerCommandProto =
ContainerCommandRequestProto
@ -155,7 +161,7 @@ public TransactionContext startTransaction(RaftClientRequest request)
.setChunkData(write.getChunkData())
// skipping the data field as it is
// already set in statemachine data proto
.setStage(ContainerProtos.Stage.COMMIT_DATA)
.setStage(Stage.COMMIT_DATA)
.build();
ContainerCommandRequestProto commitContainerCommandProto =
ContainerCommandRequestProto
@ -167,7 +173,7 @@ public TransactionContext startTransaction(RaftClientRequest request)
.setData(commitContainerCommandProto.toByteString())
.setStateMachineData(dataContainerCommandProto.toByteString())
.build();
} else if (proto.getCmdType() == ContainerProtos.Type.CreateContainer) {
} else if (proto.getCmdType() == Type.CreateContainer) {
log = SMLogEntryProto.newBuilder()
.setData(request.getMessage().getContent())
.setStateMachineData(request.getMessage().getContent())
@ -185,11 +191,16 @@ private ContainerCommandRequestProto getRequestProto(ByteString request)
return ContainerCommandRequestProto.parseFrom(request);
}
private Message runCommand(ContainerCommandRequestProto requestProto) {
private ContainerCommandResponseProto dispatchCommand(
ContainerCommandRequestProto requestProto) {
LOG.trace("dispatch {}", requestProto);
ContainerCommandResponseProto response = dispatcher.dispatch(requestProto);
LOG.trace("response {}", response);
return () -> response.toByteString();
return response;
}
private Message runCommand(ContainerCommandRequestProto requestProto) {
return dispatchCommand(requestProto)::toByteString;
}
private CompletableFuture<Message> handleWriteChunk(
@ -201,10 +212,10 @@ private CompletableFuture<Message> handleWriteChunk(
CompletableFuture<Message> writeChunkFuture;
if (future != null) {
writeChunkFuture = future.thenApplyAsync(
v -> runCommand(requestProto), writeChunkExecutor);
v -> runCommand(requestProto), chunkExecutor);
} else {
writeChunkFuture = CompletableFuture.supplyAsync(
() -> runCommand(requestProto), writeChunkExecutor);
() -> runCommand(requestProto), chunkExecutor);
}
writeChunkFutureMap.put(entryIndex, writeChunkFuture);
return writeChunkFuture;
@ -227,7 +238,7 @@ public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
try {
final ContainerCommandRequestProto requestProto =
getRequestProto(entry.getSmLogEntry().getStateMachineData());
ContainerProtos.Type cmdType = requestProto.getCmdType();
Type cmdType = requestProto.getCmdType();
switch (cmdType) {
case CreateContainer:
return handleCreateContainer(requestProto);
@ -253,6 +264,97 @@ public CompletableFuture<Message> query(Message request) {
}
}
private LogEntryProto readStateMachineData(SMLogEntryProto smLogEntryProto,
ContainerCommandRequestProto requestProto) {
WriteChunkRequestProto writeChunkRequestProto =
requestProto.getWriteChunk();
// Assert that store log entry is for COMMIT_DATA, the WRITE_DATA is
// written through writeStateMachineData.
Preconditions.checkArgument(writeChunkRequestProto.getStage()
== Stage.COMMIT_DATA);
// prepare the chunk to be read
ReadChunkRequestProto.Builder readChunkRequestProto =
ReadChunkRequestProto.newBuilder()
.setBlockID(writeChunkRequestProto.getBlockID())
.setChunkData(writeChunkRequestProto.getChunkData());
ContainerCommandRequestProto dataContainerCommandProto =
ContainerCommandRequestProto.newBuilder(requestProto)
.setCmdType(Type.ReadChunk)
.setReadChunk(readChunkRequestProto)
.build();
// read the chunk
ContainerCommandResponseProto response =
dispatchCommand(dataContainerCommandProto);
ReadChunkResponseProto responseProto = response.getReadChunk();
// assert that the response has data in it.
Preconditions.checkNotNull(responseProto.getData());
// reconstruct the write chunk request
final WriteChunkRequestProto.Builder dataWriteChunkProto =
WriteChunkRequestProto.newBuilder(writeChunkRequestProto)
// adding the state machine data
.setData(responseProto.getData())
.setStage(Stage.WRITE_DATA);
ContainerCommandRequestProto.Builder newStateMachineProto =
ContainerCommandRequestProto.newBuilder(requestProto)
.setWriteChunk(dataWriteChunkProto);
return recreateLogEntryProto(smLogEntryProto,
newStateMachineProto.build().toByteString());
}
private LogEntryProto recreateLogEntryProto(SMLogEntryProto smLogEntryProto,
ByteString stateMachineData) {
// recreate the log entry
final SMLogEntryProto log =
SMLogEntryProto.newBuilder(smLogEntryProto)
.setStateMachineData(stateMachineData)
.build();
return LogEntryProto.newBuilder().setSmLogEntry(log).build();
}
/*
* This api is used by the leader while appending logs to the follower
* This allows the leader to read the state machine data from the
* state machine implementation in case cached state machine data has been
* evicted.
*/
@Override
public CompletableFuture<LogEntryProto> readStateMachineData(
LogEntryProto entry) {
SMLogEntryProto smLogEntryProto = entry.getSmLogEntry();
if (!smLogEntryProto.getStateMachineData().isEmpty()) {
return CompletableFuture.completedFuture(entry);
}
try {
final ContainerCommandRequestProto requestProto =
getRequestProto(entry.getSmLogEntry().getData());
// readStateMachineData should only be called for "write" to Ratis.
Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto));
if (requestProto.getCmdType() == Type.WriteChunk) {
return CompletableFuture.supplyAsync(() ->
readStateMachineData(smLogEntryProto, requestProto),
chunkExecutor);
} else if (requestProto.getCmdType() == Type.CreateContainer) {
LogEntryProto log =
recreateLogEntryProto(smLogEntryProto, requestProto.toByteString());
return CompletableFuture.completedFuture(log);
} else {
throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
+ " cannot have state machine data");
}
} catch (Exception e) {
LOG.error("unable to read stateMachineData:" + e);
return completeExceptionally(e);
}
}
/*
* ApplyTransaction calls in Ratis are sequential.
*/
@ -261,9 +363,9 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
try {
ContainerCommandRequestProto requestProto =
getRequestProto(trx.getSMLogEntry().getData());
ContainerProtos.Type cmdType = requestProto.getCmdType();
Type cmdType = requestProto.getCmdType();
if (cmdType == ContainerProtos.Type.WriteChunk) {
if (cmdType == Type.WriteChunk) {
WriteChunkRequestProto write = requestProto.getWriteChunk();
// the data field has already been removed in start Transaction
Preconditions.checkArgument(!write.hasData());
@ -274,7 +376,7 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
CompletableFuture.completedFuture(runCommand(requestProto)));
} else {
Message message = runCommand(requestProto);
if (cmdType == ContainerProtos.Type.CreateContainer) {
if (cmdType == Type.CreateContainer) {
long containerID = requestProto.getContainerID();
createContainerFutureMap.remove(containerID).complete(message);
}

View File

@ -72,7 +72,7 @@ private static long nextCallId() {
private final int port;
private final RaftServer server;
private ThreadPoolExecutor writeChunkExecutor;
private ThreadPoolExecutor chunkExecutor;
private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir,
ContainerDispatcher dispatcher, Configuration conf) throws IOException {
@ -117,13 +117,13 @@ private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir,
setRequestTimeout(serverProperties, clientRequestTimeout,
serverRequestTimeout);
writeChunkExecutor =
chunkExecutor =
new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
100, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1024),
new ThreadPoolExecutor.CallerRunsPolicy());
ContainerStateMachine stateMachine =
new ContainerStateMachine(dispatcher, writeChunkExecutor);
new ContainerStateMachine(dispatcher, chunkExecutor);
this.server = RaftServer.newBuilder()
.setServerId(RatisHelper.toRaftPeerId(dd))
.setGroup(RatisHelper.emptyRaftGroup())
@ -225,14 +225,14 @@ public static XceiverServerRatis newXceiverServerRatis(
public void start() throws IOException {
LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
server.getId(), getIPCPort());
writeChunkExecutor.prestartAllCoreThreads();
chunkExecutor.prestartAllCoreThreads();
server.start();
}
@Override
public void stop() {
try {
writeChunkExecutor.shutdown();
chunkExecutor.shutdown();
server.close();
} catch (IOException e) {
throw new RuntimeException(e);

View File

@ -88,7 +88,7 @@ public OMMetrics() {
public static OMMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
return ms.register(SOURCE_NAME,
"Oozne Manager Metrics",
"Ozone Manager Metrics",
new OMMetrics());
}

View File

@ -97,7 +97,7 @@
<ldap-api.version>1.0.0-M33</ldap-api.version>
<!-- Apache Ratis version -->
<ratis.version>0.1.1-alpha-d7d7061-SNAPSHOT</ratis.version>
<ratis.version>0.3.0-c242317-SNAPSHOT</ratis.version>
<jcache.version>1.0-alpha-1</jcache.version>
<ehcache.version>3.3.1</ehcache.version>
<hikari.version>2.4.12</hikari.version>