diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index c0dd0ba080..15e991a3e0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -19,20 +19,26 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.HddsUtils; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.shaded.com.google.protobuf .InvalidProtocolBufferException; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .WriteChunkRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ReadChunkRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos + .ReadChunkResponseProto; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.shaded.com.google.protobuf.ByteString; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; @@ -96,16 +102,16 @@ public class ContainerStateMachine extends BaseStateMachine { private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); private final ContainerDispatcher dispatcher; - private ThreadPoolExecutor writeChunkExecutor; + private ThreadPoolExecutor chunkExecutor; private final ConcurrentHashMap> writeChunkFutureMap; private final ConcurrentHashMap> createContainerFutureMap; ContainerStateMachine(ContainerDispatcher dispatcher, - ThreadPoolExecutor writeChunkExecutor) { + ThreadPoolExecutor chunkExecutor) { this.dispatcher = dispatcher; - this.writeChunkExecutor = writeChunkExecutor; + this.chunkExecutor = chunkExecutor; this.writeChunkFutureMap = new ConcurrentHashMap<>(); this.createContainerFutureMap = new ConcurrentHashMap<>(); } @@ -117,9 +123,9 @@ public StateMachineStorage getStateMachineStorage() { @Override public void initialize( - RaftPeerId id, RaftProperties properties, RaftStorage raftStorage) + RaftServer server, RaftGroupId id, RaftStorage raftStorage) throws IOException { - super.initialize(id, properties, raftStorage); + super.initialize(server, id, raftStorage); storage.init(raftStorage); // TODO handle snapshots @@ -134,13 +140,13 @@ public TransactionContext startTransaction(RaftClientRequest request) getRequestProto(request.getMessage().getContent()); final SMLogEntryProto log; - if (proto.getCmdType() == ContainerProtos.Type.WriteChunk) { + if (proto.getCmdType() == Type.WriteChunk) { final WriteChunkRequestProto write = proto.getWriteChunk(); // create the state machine data proto final WriteChunkRequestProto dataWriteChunkProto = WriteChunkRequestProto .newBuilder(write) - .setStage(ContainerProtos.Stage.WRITE_DATA) + .setStage(Stage.WRITE_DATA) .build(); ContainerCommandRequestProto dataContainerCommandProto = ContainerCommandRequestProto @@ -155,7 +161,7 @@ public TransactionContext startTransaction(RaftClientRequest request) .setChunkData(write.getChunkData()) // skipping the data field as it is // already set in statemachine data proto - .setStage(ContainerProtos.Stage.COMMIT_DATA) + .setStage(Stage.COMMIT_DATA) .build(); ContainerCommandRequestProto commitContainerCommandProto = ContainerCommandRequestProto @@ -167,7 +173,7 @@ public TransactionContext startTransaction(RaftClientRequest request) .setData(commitContainerCommandProto.toByteString()) .setStateMachineData(dataContainerCommandProto.toByteString()) .build(); - } else if (proto.getCmdType() == ContainerProtos.Type.CreateContainer) { + } else if (proto.getCmdType() == Type.CreateContainer) { log = SMLogEntryProto.newBuilder() .setData(request.getMessage().getContent()) .setStateMachineData(request.getMessage().getContent()) @@ -185,11 +191,16 @@ private ContainerCommandRequestProto getRequestProto(ByteString request) return ContainerCommandRequestProto.parseFrom(request); } - private Message runCommand(ContainerCommandRequestProto requestProto) { + private ContainerCommandResponseProto dispatchCommand( + ContainerCommandRequestProto requestProto) { LOG.trace("dispatch {}", requestProto); ContainerCommandResponseProto response = dispatcher.dispatch(requestProto); LOG.trace("response {}", response); - return () -> response.toByteString(); + return response; + } + + private Message runCommand(ContainerCommandRequestProto requestProto) { + return dispatchCommand(requestProto)::toByteString; } private CompletableFuture handleWriteChunk( @@ -201,10 +212,10 @@ private CompletableFuture handleWriteChunk( CompletableFuture writeChunkFuture; if (future != null) { writeChunkFuture = future.thenApplyAsync( - v -> runCommand(requestProto), writeChunkExecutor); + v -> runCommand(requestProto), chunkExecutor); } else { writeChunkFuture = CompletableFuture.supplyAsync( - () -> runCommand(requestProto), writeChunkExecutor); + () -> runCommand(requestProto), chunkExecutor); } writeChunkFutureMap.put(entryIndex, writeChunkFuture); return writeChunkFuture; @@ -227,7 +238,7 @@ public CompletableFuture writeStateMachineData(LogEntryProto entry) { try { final ContainerCommandRequestProto requestProto = getRequestProto(entry.getSmLogEntry().getStateMachineData()); - ContainerProtos.Type cmdType = requestProto.getCmdType(); + Type cmdType = requestProto.getCmdType(); switch (cmdType) { case CreateContainer: return handleCreateContainer(requestProto); @@ -253,6 +264,97 @@ public CompletableFuture query(Message request) { } } + private LogEntryProto readStateMachineData(SMLogEntryProto smLogEntryProto, + ContainerCommandRequestProto requestProto) { + WriteChunkRequestProto writeChunkRequestProto = + requestProto.getWriteChunk(); + // Assert that store log entry is for COMMIT_DATA, the WRITE_DATA is + // written through writeStateMachineData. + Preconditions.checkArgument(writeChunkRequestProto.getStage() + == Stage.COMMIT_DATA); + + // prepare the chunk to be read + ReadChunkRequestProto.Builder readChunkRequestProto = + ReadChunkRequestProto.newBuilder() + .setBlockID(writeChunkRequestProto.getBlockID()) + .setChunkData(writeChunkRequestProto.getChunkData()); + ContainerCommandRequestProto dataContainerCommandProto = + ContainerCommandRequestProto.newBuilder(requestProto) + .setCmdType(Type.ReadChunk) + .setReadChunk(readChunkRequestProto) + .build(); + + // read the chunk + ContainerCommandResponseProto response = + dispatchCommand(dataContainerCommandProto); + ReadChunkResponseProto responseProto = response.getReadChunk(); + + // assert that the response has data in it. + Preconditions.checkNotNull(responseProto.getData()); + + // reconstruct the write chunk request + final WriteChunkRequestProto.Builder dataWriteChunkProto = + WriteChunkRequestProto.newBuilder(writeChunkRequestProto) + // adding the state machine data + .setData(responseProto.getData()) + .setStage(Stage.WRITE_DATA); + + ContainerCommandRequestProto.Builder newStateMachineProto = + ContainerCommandRequestProto.newBuilder(requestProto) + .setWriteChunk(dataWriteChunkProto); + + return recreateLogEntryProto(smLogEntryProto, + newStateMachineProto.build().toByteString()); + } + + private LogEntryProto recreateLogEntryProto(SMLogEntryProto smLogEntryProto, + ByteString stateMachineData) { + // recreate the log entry + final SMLogEntryProto log = + SMLogEntryProto.newBuilder(smLogEntryProto) + .setStateMachineData(stateMachineData) + .build(); + return LogEntryProto.newBuilder().setSmLogEntry(log).build(); + } + + /* + * This api is used by the leader while appending logs to the follower + * This allows the leader to read the state machine data from the + * state machine implementation in case cached state machine data has been + * evicted. + */ + @Override + public CompletableFuture readStateMachineData( + LogEntryProto entry) { + SMLogEntryProto smLogEntryProto = entry.getSmLogEntry(); + if (!smLogEntryProto.getStateMachineData().isEmpty()) { + return CompletableFuture.completedFuture(entry); + } + + try { + final ContainerCommandRequestProto requestProto = + getRequestProto(entry.getSmLogEntry().getData()); + // readStateMachineData should only be called for "write" to Ratis. + Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto)); + + if (requestProto.getCmdType() == Type.WriteChunk) { + return CompletableFuture.supplyAsync(() -> + readStateMachineData(smLogEntryProto, requestProto), + chunkExecutor); + } else if (requestProto.getCmdType() == Type.CreateContainer) { + LogEntryProto log = + recreateLogEntryProto(smLogEntryProto, requestProto.toByteString()); + return CompletableFuture.completedFuture(log); + } else { + throw new IllegalStateException("Cmd type:" + requestProto.getCmdType() + + " cannot have state machine data"); + } + } catch (Exception e) { + LOG.error("unable to read stateMachineData:" + e); + return completeExceptionally(e); + } + } + /* * ApplyTransaction calls in Ratis are sequential. */ @@ -261,9 +363,9 @@ public CompletableFuture applyTransaction(TransactionContext trx) { try { ContainerCommandRequestProto requestProto = getRequestProto(trx.getSMLogEntry().getData()); - ContainerProtos.Type cmdType = requestProto.getCmdType(); + Type cmdType = requestProto.getCmdType(); - if (cmdType == ContainerProtos.Type.WriteChunk) { + if (cmdType == Type.WriteChunk) { WriteChunkRequestProto write = requestProto.getWriteChunk(); // the data field has already been removed in start Transaction Preconditions.checkArgument(!write.hasData()); @@ -274,7 +376,7 @@ public CompletableFuture applyTransaction(TransactionContext trx) { CompletableFuture.completedFuture(runCommand(requestProto))); } else { Message message = runCommand(requestProto); - if (cmdType == ContainerProtos.Type.CreateContainer) { + if (cmdType == Type.CreateContainer) { long containerID = requestProto.getContainerID(); createContainerFutureMap.remove(containerID).complete(message); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index b9c7cae493..723b94ae2d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -72,7 +72,7 @@ private static long nextCallId() { private final int port; private final RaftServer server; - private ThreadPoolExecutor writeChunkExecutor; + private ThreadPoolExecutor chunkExecutor; private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir, ContainerDispatcher dispatcher, Configuration conf) throws IOException { @@ -117,13 +117,13 @@ private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir, setRequestTimeout(serverProperties, clientRequestTimeout, serverRequestTimeout); - writeChunkExecutor = + chunkExecutor = new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy()); ContainerStateMachine stateMachine = - new ContainerStateMachine(dispatcher, writeChunkExecutor); + new ContainerStateMachine(dispatcher, chunkExecutor); this.server = RaftServer.newBuilder() .setServerId(RatisHelper.toRaftPeerId(dd)) .setGroup(RatisHelper.emptyRaftGroup()) @@ -225,14 +225,14 @@ public static XceiverServerRatis newXceiverServerRatis( public void start() throws IOException { LOG.info("Starting {} {} at port {}", getClass().getSimpleName(), server.getId(), getIPCPort()); - writeChunkExecutor.prestartAllCoreThreads(); + chunkExecutor.prestartAllCoreThreads(); server.start(); } @Override public void stop() { try { - writeChunkExecutor.shutdown(); + chunkExecutor.shutdown(); server.close(); } catch (IOException e) { throw new RuntimeException(e); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java index 2d044521ed..b8cfc97059 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java @@ -88,7 +88,7 @@ public OMMetrics() { public static OMMetrics create() { MetricsSystem ms = DefaultMetricsSystem.instance(); return ms.register(SOURCE_NAME, - "Oozne Manager Metrics", + "Ozone Manager Metrics", new OMMetrics()); } diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 387a3da0f8..7603842bf1 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -97,7 +97,7 @@ 1.0.0-M33 - 0.1.1-alpha-d7d7061-SNAPSHOT + 0.3.0-c242317-SNAPSHOT 1.0-alpha-1 3.3.1 2.4.12