diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index d234a3f408..7ce7ee3dd7 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.security.x509.SecurityConfig; @@ -56,7 +57,6 @@ import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; @@ -219,39 +219,16 @@ private CompletableFuture sendRequestAsync( try (Scope scope = GlobalTracer.get() .buildSpan("XceiverClientRatis." + request.getCmdType().name()) .startActive(true)) { - ContainerCommandRequestProto finalPayload = - ContainerCommandRequestProto.newBuilder(request) - .setTraceID(TracingUtil.exportCurrentSpan()) - .build(); - boolean isReadOnlyRequest = HddsUtils.isReadOnly(finalPayload); - ByteString byteString = finalPayload.toByteString(); - if (LOG.isDebugEnabled()) { - LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest, - sanitizeForDebug(finalPayload)); + final ContainerCommandRequestMessage message + = ContainerCommandRequestMessage.toMessage( + request, TracingUtil.exportCurrentSpan()); + if (HddsUtils.isReadOnly(request)) { + LOG.debug("sendCommandAsync ReadOnly {}", message); + return getClient().sendReadOnlyAsync(message); + } else { + LOG.debug("sendCommandAsync {}", message); + return getClient().sendAsync(message); } - return isReadOnlyRequest ? - getClient().sendReadOnlyAsync(() -> byteString) : - getClient().sendAsync(() -> byteString); - } - } - - private ContainerCommandRequestProto sanitizeForDebug( - ContainerCommandRequestProto request) { - switch (request.getCmdType()) { - case PutSmallFile: - return request.toBuilder() - .setPutSmallFile(request.getPutSmallFile().toBuilder() - .clearData() - ) - .build(); - case WriteChunk: - return request.toBuilder() - .setWriteChunk(request.getWriteChunk().toBuilder() - .clearData() - ) - .build(); - default: - return request; } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/ContainerCommandRequestMessage.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/ContainerCommandRequestMessage.java new file mode 100644 index 0000000000..07a886a0f9 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/ContainerCommandRequestMessage.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.ratis; + +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.ratis.util.JavaUtils; + +import java.util.Objects; +import java.util.function.Supplier; + +/** + * Implementing the {@link Message} interface + * for {@link ContainerCommandRequestProto}. + */ +public final class ContainerCommandRequestMessage implements Message { + public static ContainerCommandRequestMessage toMessage( + ContainerCommandRequestProto request, String traceId) { + final ContainerCommandRequestProto.Builder b + = ContainerCommandRequestProto.newBuilder(request); + if (traceId != null) { + b.setTraceID(traceId); + } + + ByteString data = ByteString.EMPTY; + if (request.getCmdType() == Type.WriteChunk) { + final WriteChunkRequestProto w = request.getWriteChunk(); + data = w.getData(); + b.setWriteChunk(w.toBuilder().clearData()); + } else if (request.getCmdType() == Type.PutSmallFile) { + final PutSmallFileRequestProto p = request.getPutSmallFile(); + data = p.getData(); + b.setPutSmallFile(p.toBuilder().setData(ByteString.EMPTY)); + } + return new ContainerCommandRequestMessage(b.build(), data); + } + + public static ContainerCommandRequestProto toProto( + ByteString bytes, RaftGroupId groupId) + throws InvalidProtocolBufferException { + final int i = 4 + bytes.asReadOnlyByteBuffer().getInt(); + final ContainerCommandRequestProto header + = ContainerCommandRequestProto.parseFrom(bytes.substring(4, i)); + // TODO: setting pipeline id can be avoided if the client is sending it. + // In such case, just have to validate the pipeline id. + final ContainerCommandRequestProto.Builder b = header.toBuilder(); + if (groupId != null) { + b.setPipelineID(groupId.getUuid().toString()); + } + final ByteString data = bytes.substring(i); + if (header.getCmdType() == Type.WriteChunk) { + b.setWriteChunk(b.getWriteChunkBuilder().setData(data)); + } else if (header.getCmdType() == Type.PutSmallFile) { + b.setPutSmallFile(b.getPutSmallFileBuilder().setData(data)); + } + return b.build(); + } + + private final ContainerCommandRequestProto header; + private final ByteString data; + private final Supplier contentSupplier + = JavaUtils.memoize(this::buildContent); + + private ContainerCommandRequestMessage( + ContainerCommandRequestProto header, ByteString data) { + this.header = Objects.requireNonNull(header, "header == null"); + this.data = Objects.requireNonNull(data, "data == null"); + } + + private ByteString buildContent() { + final ByteString headerBytes = header.toByteString(); + return RatisHelper.int2ByteString(headerBytes.size()) + .concat(headerBytes) + .concat(data); + } + + @Override + public ByteString getContent() { + return contentSupplier.get(); + } + + @Override + public String toString() { + return header + ", data.size=" + data.size(); + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java index 3ad4e2e7a2..fc0effa9e6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.ratis; +import java.io.DataOutputStream; import java.io.IOException; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; @@ -272,4 +273,15 @@ static Long getMinReplicatedIndex( return commitInfos.stream().map(RaftProtos.CommitInfoProto::getCommitIndex) .min(Long::compareTo).orElse(null); } + + static ByteString int2ByteString(int n) { + final ByteString.Output out = ByteString.newOutput(); + try(DataOutputStream dataOut = new DataOutputStream(out)) { + dataOut.writeInt(n); + } catch (IOException e) { + throw new IllegalStateException( + "Failed to write integer n = " + n + " to a ByteString.", e); + } + return out.toByteString(); + } } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestContainerCommandRequestMessage.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestContainerCommandRequestMessage.java new file mode 100644 index 0000000000..bbe6ab7cca --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestContainerCommandRequestMessage.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.ratis; + +import org.apache.hadoop.hdds.client.BlockID; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto; +import org.apache.hadoop.ozone.common.Checksum; +import org.apache.hadoop.ozone.common.ChecksumData; +import org.apache.hadoop.ozone.common.OzoneChecksumException; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Random; +import java.util.UUID; +import java.util.function.BiFunction; + +/** Testing {@link ContainerCommandRequestMessage}. */ +public class TestContainerCommandRequestMessage { + static final Random RANDOM = new Random(); + + static ByteString newData(int length, Random random) { + final ByteString.Output out = ByteString.newOutput(); + for(int i = 0; i < length; i++) { + out.write(random.nextInt()); + } + return out.toByteString(); + } + + static ChecksumData checksum(ByteString data) { + try { + return new Checksum().computeChecksum(data.toByteArray()); + } catch (OzoneChecksumException e) { + throw new IllegalStateException(e); + } + } + + static ContainerCommandRequestProto newPutSmallFile( + BlockID blockID, ByteString data) { + final BlockData.Builder blockData + = BlockData.newBuilder() + .setBlockID(blockID.getDatanodeBlockIDProtobuf()); + final PutBlockRequestProto.Builder putBlockRequest + = PutBlockRequestProto.newBuilder() + .setBlockData(blockData); + final KeyValue keyValue = KeyValue.newBuilder() + .setKey("OverWriteRequested") + .setValue("true") + .build(); + final ChunkInfo chunk = ChunkInfo.newBuilder() + .setChunkName(blockID.getLocalID() + "_chunk") + .setOffset(0) + .setLen(data.size()) + .addMetadata(keyValue) + .setChecksumData(checksum(data).getProtoBufMessage()) + .build(); + final PutSmallFileRequestProto putSmallFileRequest + = PutSmallFileRequestProto.newBuilder() + .setChunkInfo(chunk) + .setBlock(putBlockRequest) + .setData(data) + .build(); + return ContainerCommandRequestProto.newBuilder() + .setCmdType(Type.PutSmallFile) + .setContainerID(blockID.getContainerID()) + .setDatanodeUuid(UUID.randomUUID().toString()) + .setPutSmallFile(putSmallFileRequest) + .build(); + } + + static ContainerCommandRequestProto newWriteChunk( + BlockID blockID, ByteString data) { + final ChunkInfo chunk = ChunkInfo.newBuilder() + .setChunkName(blockID.getLocalID() + "_chunk_" + 1) + .setOffset(0) + .setLen(data.size()) + .setChecksumData(checksum(data).getProtoBufMessage()) + .build(); + + final WriteChunkRequestProto.Builder writeChunkRequest + = WriteChunkRequestProto.newBuilder() + .setBlockID(blockID.getDatanodeBlockIDProtobuf()) + .setChunkData(chunk) + .setData(data); + return ContainerCommandRequestProto.newBuilder() + .setCmdType(Type.WriteChunk) + .setContainerID(blockID.getContainerID()) + .setDatanodeUuid(UUID.randomUUID().toString()) + .setWriteChunk(writeChunkRequest) + .build(); + } + + @Test + public void testPutSmallFile() throws Exception { + runTest(TestContainerCommandRequestMessage::newPutSmallFile); + } + + @Test + public void testWriteChunk() throws Exception { + runTest(TestContainerCommandRequestMessage::newWriteChunk); + } + + static void runTest( + BiFunction method) + throws Exception { + for(int i = 0; i < 2; i++) { + runTest(i, method); + } + for(int i = 2; i < 1 << 10;) { + runTest(i + 1 + RANDOM.nextInt(i - 1), method); + i <<= 1; + runTest(i, method); + } + } + + static void runTest(int length, + BiFunction method) + throws Exception { + System.out.println("length=" + length); + final BlockID blockID = new BlockID(RANDOM.nextLong(), RANDOM.nextLong()); + final ByteString data = newData(length, RANDOM); + + final ContainerCommandRequestProto original = method.apply(blockID, data); + final ContainerCommandRequestMessage message + = ContainerCommandRequestMessage.toMessage(original, null); + final ContainerCommandRequestProto computed + = ContainerCommandRequestMessage.toProto(message.getContent(), null); + Assert.assertEquals(original, computed); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 7b638a3cd8..489a640d8b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; @@ -313,7 +314,7 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOException { long startTime = Time.monotonicNowNanos(); final ContainerCommandRequestProto proto = - getContainerCommandRequestProto(request.getMessage().getContent()); + message2ContainerCommandRequestProto(request.getMessage()); Preconditions.checkArgument(request.getRaftGroupId().equals(gid)); try { dispatcher.validateContainerCommand(proto); @@ -363,7 +364,7 @@ public TransactionContext startTransaction(RaftClientRequest request) .setStateMachine(this) .setServerRole(RaftPeerRole.LEADER) .setStateMachineContext(startTime) - .setLogData(request.getMessage().getContent()) + .setLogData(proto.toByteString()) .build(); } @@ -383,6 +384,11 @@ private ContainerCommandRequestProto getContainerCommandRequestProto( .setPipelineID(gid.getUuid().toString()).build(); } + private ContainerCommandRequestProto message2ContainerCommandRequestProto( + Message message) throws InvalidProtocolBufferException { + return ContainerCommandRequestMessage.toProto(message.getContent(), gid); + } + private ContainerCommandResponseProto dispatchCommand( ContainerCommandRequestProto requestProto, DispatcherContext context) { LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", gid, @@ -530,7 +536,7 @@ public CompletableFuture query(Message request) { try { metrics.incNumQueryStateMachineOps(); final ContainerCommandRequestProto requestProto = - getContainerCommandRequestProto(request.getContent()); + message2ContainerCommandRequestProto(request); return CompletableFuture .completedFuture(runCommand(requestProto, null)::toByteString); } catch (IOException e) { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index 179547b844..80e91cdf55 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction; +import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage; import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.security.x509.SecurityConfig; @@ -516,8 +517,8 @@ private RaftClientRequest createRaftClientRequest( RaftClientRequest.Type type) { return new RaftClientRequest(clientId, server.getId(), RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineID).getId()), - nextCallId(), Message.valueOf(request.toByteString()), type, - null); + nextCallId(), ContainerCommandRequestMessage.toMessage(request, null), + type, null); } private GroupInfoRequest createGroupInfoRequest( diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java index a043cdce2e..747bc3eb77 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java @@ -80,7 +80,7 @@ public static void writeData(File chunkFile, ChunkInfo chunkInfo, ByteBuffer data, VolumeIOStats volumeIOStats, boolean sync) throws StorageContainerException, ExecutionException, InterruptedException, NoSuchAlgorithmException { - int bufferSize = data.capacity(); + final int bufferSize = data.remaining(); Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); if (bufferSize != chunkInfo.getLen()) { String err = String.format("data array does not match the length " +