diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index ed7e099a65..0fc2d0d99b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -78,7 +78,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -277,7 +276,7 @@ public long takeSnapshot() throws IOException { public TransactionContext startTransaction(RaftClientRequest request) throws IOException { final ContainerCommandRequestProto proto = - getRequestProto(request.getMessage().getContent()); + getContainerCommandRequestProto(request.getMessage().getContent()); Preconditions.checkArgument(request.getRaftGroupId().equals(gid)); try (Scope scope = TracingUtil .importAndCreateScope(proto.getCmdType().name(), proto.getTraceID())) { @@ -294,17 +293,6 @@ public TransactionContext startTransaction(RaftClientRequest request) } if (proto.getCmdType() == Type.WriteChunk) { final WriteChunkRequestProto write = proto.getWriteChunk(); - // create the state machine data proto - final WriteChunkRequestProto dataWriteChunkProto = - WriteChunkRequestProto - .newBuilder(write) - .build(); - ContainerCommandRequestProto dataContainerCommandProto = - ContainerCommandRequestProto - .newBuilder(proto) - .setWriteChunk(dataWriteChunkProto) - .build(); - // create the log entry proto final WriteChunkRequestProto commitWriteChunkProto = WriteChunkRequestProto.newBuilder() @@ -323,7 +311,7 @@ public TransactionContext startTransaction(RaftClientRequest request) .setClientRequest(request) .setStateMachine(this) .setServerRole(RaftPeerRole.LEADER) - .setStateMachineData(dataContainerCommandProto.toByteString()) + .setStateMachineData(write.getData()) .setLogData(commitContainerCommandProto.toByteString()) .build(); } else { @@ -341,8 +329,8 @@ private ByteString getStateMachineData(StateMachineLogEntryProto entryProto) { return entryProto.getStateMachineEntry().getStateMachineData(); } - private ContainerCommandRequestProto getRequestProto(ByteString request) - throws InvalidProtocolBufferException { + private ContainerCommandRequestProto getContainerCommandRequestProto( + ByteString request) throws InvalidProtocolBufferException { // TODO: We can avoid creating new builder and set pipeline Id if // the client is already sending the pipeline id, then we just have to // validate the pipeline Id. @@ -353,7 +341,9 @@ private ContainerCommandRequestProto getRequestProto(ByteString request) private ContainerCommandResponseProto dispatchCommand( ContainerCommandRequestProto requestProto, DispatcherContext context) { - LOG.trace("dispatch {}", requestProto); + LOG.trace("dispatch {} containerID={} pipelineID={} traceID={}", + requestProto.getCmdType(), requestProto.getContainerID(), + requestProto.getPipelineID(), requestProto.getTraceID()); if (isBlockTokenEnabled) { try { // ServerInterceptors intercepts incoming request and creates ugi. @@ -432,8 +422,15 @@ private CompletableFuture handleWriteChunk( public CompletableFuture writeStateMachineData(LogEntryProto entry) { try { metrics.incNumWriteStateMachineOps(); - final ContainerCommandRequestProto requestProto = - getRequestProto(getStateMachineData(entry.getStateMachineLogEntry())); + ContainerCommandRequestProto requestProto = + getContainerCommandRequestProto( + entry.getStateMachineLogEntry().getLogData()); + WriteChunkRequestProto writeChunk = + WriteChunkRequestProto.newBuilder(requestProto.getWriteChunk()) + .setData(getStateMachineData(entry.getStateMachineLogEntry())) + .build(); + requestProto = ContainerCommandRequestProto.newBuilder(requestProto) + .setWriteChunk(writeChunk).build(); Type cmdType = requestProto.getCmdType(); // For only writeChunk, there will be writeStateMachineData call. @@ -457,7 +454,7 @@ public CompletableFuture query(Message request) { try { metrics.incNumReadStateMachineOps(); final ContainerCommandRequestProto requestProto = - getRequestProto(request.getContent()); + getContainerCommandRequestProto(request.getContent()); return CompletableFuture.completedFuture(runCommand(requestProto, null)); } catch (IOException e) { metrics.incNumReadStateMachineFails(); @@ -507,34 +504,8 @@ private ByteString readStateMachineData( */ private ByteString getCachedStateMachineData(Long logIndex, long term, ContainerCommandRequestProto requestProto) throws ExecutionException { - try { - return reconstructWriteChunkRequest( - stateMachineDataCache.get(logIndex, new Callable() { - @Override - public ByteString call() throws Exception { - return readStateMachineData(requestProto, term, logIndex); - } - }), requestProto); - } catch (ExecutionException e) { - throw e; - } - } - - private ByteString reconstructWriteChunkRequest(ByteString data, - ContainerCommandRequestProto requestProto) { - WriteChunkRequestProto writeChunkRequestProto = - requestProto.getWriteChunk(); - // reconstruct the write chunk request - final WriteChunkRequestProto.Builder dataWriteChunkProto = - WriteChunkRequestProto.newBuilder(writeChunkRequestProto) - // adding the state machine data - .setData(data); - - ContainerCommandRequestProto.Builder newStateMachineProto = - ContainerCommandRequestProto.newBuilder(requestProto) - .setWriteChunk(dataWriteChunkProto); - - return newStateMachineProto.build().toByteString(); + return stateMachineDataCache.get(logIndex, + () -> readStateMachineData(requestProto, term, logIndex)); } /** @@ -568,7 +539,8 @@ public CompletableFuture readStateMachineData( } try { final ContainerCommandRequestProto requestProto = - getRequestProto(entry.getStateMachineLogEntry().getLogData()); + getContainerCommandRequestProto( + entry.getStateMachineLogEntry().getLogData()); // readStateMachineData should only be called for "write" to Ratis. Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto)); if (requestProto.getCmdType() == Type.WriteChunk) { @@ -632,7 +604,8 @@ public CompletableFuture applyTransaction(TransactionContext trx) { try { metrics.incNumApplyTransactionsOps(); ContainerCommandRequestProto requestProto = - getRequestProto(trx.getStateMachineLogEntry().getLogData()); + getContainerCommandRequestProto( + trx.getStateMachineLogEntry().getLogData()); Type cmdType = requestProto.getCmdType(); // Make sure that in write chunk, the user data is not set if (cmdType == Type.WriteChunk) {