HDDS-2169. Avoid buffer copies while submitting client requests in Ratis. Contributed by Tsz-wo Sze(#1517).

This commit is contained in:
Shashikant Banerjee 2019-10-07 09:34:36 +05:30
parent 55c5436f39
commit 022fe5f5b2
7 changed files with 294 additions and 39 deletions

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.SecurityConfig;
@ -56,7 +57,6 @@
import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.TimeDuration; import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -219,39 +219,16 @@ private CompletableFuture<RaftClientReply> sendRequestAsync(
try (Scope scope = GlobalTracer.get() try (Scope scope = GlobalTracer.get()
.buildSpan("XceiverClientRatis." + request.getCmdType().name()) .buildSpan("XceiverClientRatis." + request.getCmdType().name())
.startActive(true)) { .startActive(true)) {
ContainerCommandRequestProto finalPayload = final ContainerCommandRequestMessage message
ContainerCommandRequestProto.newBuilder(request) = ContainerCommandRequestMessage.toMessage(
.setTraceID(TracingUtil.exportCurrentSpan()) request, TracingUtil.exportCurrentSpan());
.build(); if (HddsUtils.isReadOnly(request)) {
boolean isReadOnlyRequest = HddsUtils.isReadOnly(finalPayload); LOG.debug("sendCommandAsync ReadOnly {}", message);
ByteString byteString = finalPayload.toByteString(); return getClient().sendReadOnlyAsync(message);
if (LOG.isDebugEnabled()) { } else {
LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest, LOG.debug("sendCommandAsync {}", message);
sanitizeForDebug(finalPayload)); return getClient().sendAsync(message);
} }
return isReadOnlyRequest ?
getClient().sendReadOnlyAsync(() -> byteString) :
getClient().sendAsync(() -> byteString);
}
}
private ContainerCommandRequestProto sanitizeForDebug(
ContainerCommandRequestProto request) {
switch (request.getCmdType()) {
case PutSmallFile:
return request.toBuilder()
.setPutSmallFile(request.getPutSmallFile().toBuilder()
.clearData()
)
.build();
case WriteChunk:
return request.toBuilder()
.setWriteChunk(request.getWriteChunk().toBuilder()
.clearData()
)
.build();
default:
return request;
} }
} }

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.ratis;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.JavaUtils;
import java.util.Objects;
import java.util.function.Supplier;
/**
* Implementing the {@link Message} interface
* for {@link ContainerCommandRequestProto}.
*/
public final class ContainerCommandRequestMessage implements Message {
public static ContainerCommandRequestMessage toMessage(
ContainerCommandRequestProto request, String traceId) {
final ContainerCommandRequestProto.Builder b
= ContainerCommandRequestProto.newBuilder(request);
if (traceId != null) {
b.setTraceID(traceId);
}
ByteString data = ByteString.EMPTY;
if (request.getCmdType() == Type.WriteChunk) {
final WriteChunkRequestProto w = request.getWriteChunk();
data = w.getData();
b.setWriteChunk(w.toBuilder().clearData());
} else if (request.getCmdType() == Type.PutSmallFile) {
final PutSmallFileRequestProto p = request.getPutSmallFile();
data = p.getData();
b.setPutSmallFile(p.toBuilder().setData(ByteString.EMPTY));
}
return new ContainerCommandRequestMessage(b.build(), data);
}
public static ContainerCommandRequestProto toProto(
ByteString bytes, RaftGroupId groupId)
throws InvalidProtocolBufferException {
final int i = 4 + bytes.asReadOnlyByteBuffer().getInt();
final ContainerCommandRequestProto header
= ContainerCommandRequestProto.parseFrom(bytes.substring(4, i));
// TODO: setting pipeline id can be avoided if the client is sending it.
// In such case, just have to validate the pipeline id.
final ContainerCommandRequestProto.Builder b = header.toBuilder();
if (groupId != null) {
b.setPipelineID(groupId.getUuid().toString());
}
final ByteString data = bytes.substring(i);
if (header.getCmdType() == Type.WriteChunk) {
b.setWriteChunk(b.getWriteChunkBuilder().setData(data));
} else if (header.getCmdType() == Type.PutSmallFile) {
b.setPutSmallFile(b.getPutSmallFileBuilder().setData(data));
}
return b.build();
}
private final ContainerCommandRequestProto header;
private final ByteString data;
private final Supplier<ByteString> contentSupplier
= JavaUtils.memoize(this::buildContent);
private ContainerCommandRequestMessage(
ContainerCommandRequestProto header, ByteString data) {
this.header = Objects.requireNonNull(header, "header == null");
this.data = Objects.requireNonNull(data, "data == null");
}
private ByteString buildContent() {
final ByteString headerBytes = header.toByteString();
return RatisHelper.int2ByteString(headerBytes.size())
.concat(headerBytes)
.concat(data);
}
@Override
public ByteString getContent() {
return contentSupplier.get();
}
@Override
public String toString() {
return header + ", data.size=" + data.size();
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.ratis; package org.apache.hadoop.hdds.ratis;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.security.cert.CertificateException; import java.security.cert.CertificateException;
import java.security.cert.X509Certificate; import java.security.cert.X509Certificate;
@ -272,4 +273,15 @@ static Long getMinReplicatedIndex(
return commitInfos.stream().map(RaftProtos.CommitInfoProto::getCommitIndex) return commitInfos.stream().map(RaftProtos.CommitInfoProto::getCommitIndex)
.min(Long::compareTo).orElse(null); .min(Long::compareTo).orElse(null);
} }
static ByteString int2ByteString(int n) {
final ByteString.Output out = ByteString.newOutput();
try(DataOutputStream dataOut = new DataOutputStream(out)) {
dataOut.writeInt(n);
} catch (IOException e) {
throw new IllegalStateException(
"Failed to write integer n = " + n + " to a ByteString.", e);
}
return out.toByteString();
}
} }

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.ratis;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Test;
import java.util.Random;
import java.util.UUID;
import java.util.function.BiFunction;
/** Testing {@link ContainerCommandRequestMessage}. */
public class TestContainerCommandRequestMessage {
static final Random RANDOM = new Random();
static ByteString newData(int length, Random random) {
final ByteString.Output out = ByteString.newOutput();
for(int i = 0; i < length; i++) {
out.write(random.nextInt());
}
return out.toByteString();
}
static ChecksumData checksum(ByteString data) {
try {
return new Checksum().computeChecksum(data.toByteArray());
} catch (OzoneChecksumException e) {
throw new IllegalStateException(e);
}
}
static ContainerCommandRequestProto newPutSmallFile(
BlockID blockID, ByteString data) {
final BlockData.Builder blockData
= BlockData.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf());
final PutBlockRequestProto.Builder putBlockRequest
= PutBlockRequestProto.newBuilder()
.setBlockData(blockData);
final KeyValue keyValue = KeyValue.newBuilder()
.setKey("OverWriteRequested")
.setValue("true")
.build();
final ChunkInfo chunk = ChunkInfo.newBuilder()
.setChunkName(blockID.getLocalID() + "_chunk")
.setOffset(0)
.setLen(data.size())
.addMetadata(keyValue)
.setChecksumData(checksum(data).getProtoBufMessage())
.build();
final PutSmallFileRequestProto putSmallFileRequest
= PutSmallFileRequestProto.newBuilder()
.setChunkInfo(chunk)
.setBlock(putBlockRequest)
.setData(data)
.build();
return ContainerCommandRequestProto.newBuilder()
.setCmdType(Type.PutSmallFile)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(UUID.randomUUID().toString())
.setPutSmallFile(putSmallFileRequest)
.build();
}
static ContainerCommandRequestProto newWriteChunk(
BlockID blockID, ByteString data) {
final ChunkInfo chunk = ChunkInfo.newBuilder()
.setChunkName(blockID.getLocalID() + "_chunk_" + 1)
.setOffset(0)
.setLen(data.size())
.setChecksumData(checksum(data).getProtoBufMessage())
.build();
final WriteChunkRequestProto.Builder writeChunkRequest
= WriteChunkRequestProto.newBuilder()
.setBlockID(blockID.getDatanodeBlockIDProtobuf())
.setChunkData(chunk)
.setData(data);
return ContainerCommandRequestProto.newBuilder()
.setCmdType(Type.WriteChunk)
.setContainerID(blockID.getContainerID())
.setDatanodeUuid(UUID.randomUUID().toString())
.setWriteChunk(writeChunkRequest)
.build();
}
@Test
public void testPutSmallFile() throws Exception {
runTest(TestContainerCommandRequestMessage::newPutSmallFile);
}
@Test
public void testWriteChunk() throws Exception {
runTest(TestContainerCommandRequestMessage::newWriteChunk);
}
static void runTest(
BiFunction<BlockID, ByteString, ContainerCommandRequestProto> method)
throws Exception {
for(int i = 0; i < 2; i++) {
runTest(i, method);
}
for(int i = 2; i < 1 << 10;) {
runTest(i + 1 + RANDOM.nextInt(i - 1), method);
i <<= 1;
runTest(i, method);
}
}
static void runTest(int length,
BiFunction<BlockID, ByteString, ContainerCommandRequestProto> method)
throws Exception {
System.out.println("length=" + length);
final BlockID blockID = new BlockID(RANDOM.nextLong(), RANDOM.nextLong());
final ByteString data = newData(length, RANDOM);
final ContainerCommandRequestProto original = method.apply(blockID, data);
final ContainerCommandRequestMessage message
= ContainerCommandRequestMessage.toMessage(original, null);
final ContainerCommandRequestProto computed
= ContainerCommandRequestMessage.toProto(message.getContent(), null);
Assert.assertEquals(original, computed);
}
}

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
@ -313,7 +314,7 @@ public TransactionContext startTransaction(RaftClientRequest request)
throws IOException { throws IOException {
long startTime = Time.monotonicNowNanos(); long startTime = Time.monotonicNowNanos();
final ContainerCommandRequestProto proto = final ContainerCommandRequestProto proto =
getContainerCommandRequestProto(request.getMessage().getContent()); message2ContainerCommandRequestProto(request.getMessage());
Preconditions.checkArgument(request.getRaftGroupId().equals(gid)); Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
try { try {
dispatcher.validateContainerCommand(proto); dispatcher.validateContainerCommand(proto);
@ -363,7 +364,7 @@ public TransactionContext startTransaction(RaftClientRequest request)
.setStateMachine(this) .setStateMachine(this)
.setServerRole(RaftPeerRole.LEADER) .setServerRole(RaftPeerRole.LEADER)
.setStateMachineContext(startTime) .setStateMachineContext(startTime)
.setLogData(request.getMessage().getContent()) .setLogData(proto.toByteString())
.build(); .build();
} }
@ -383,6 +384,11 @@ private ContainerCommandRequestProto getContainerCommandRequestProto(
.setPipelineID(gid.getUuid().toString()).build(); .setPipelineID(gid.getUuid().toString()).build();
} }
private ContainerCommandRequestProto message2ContainerCommandRequestProto(
Message message) throws InvalidProtocolBufferException {
return ContainerCommandRequestMessage.toProto(message.getContent(), gid);
}
private ContainerCommandResponseProto dispatchCommand( private ContainerCommandResponseProto dispatchCommand(
ContainerCommandRequestProto requestProto, DispatcherContext context) { ContainerCommandRequestProto requestProto, DispatcherContext context) {
LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", gid, LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", gid,
@ -530,7 +536,7 @@ public CompletableFuture<Message> query(Message request) {
try { try {
metrics.incNumQueryStateMachineOps(); metrics.incNumQueryStateMachineOps();
final ContainerCommandRequestProto requestProto = final ContainerCommandRequestProto requestProto =
getContainerCommandRequestProto(request.getContent()); message2ContainerCommandRequestProto(request);
return CompletableFuture return CompletableFuture
.completedFuture(runCommand(requestProto, null)::toByteString); .completedFuture(runCommand(requestProto, null)::toByteString);
} catch (IOException e) { } catch (IOException e) {

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID; import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.SecurityConfig;
@ -516,8 +517,8 @@ private RaftClientRequest createRaftClientRequest(
RaftClientRequest.Type type) { RaftClientRequest.Type type) {
return new RaftClientRequest(clientId, server.getId(), return new RaftClientRequest(clientId, server.getId(),
RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineID).getId()), RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineID).getId()),
nextCallId(), Message.valueOf(request.toByteString()), type, nextCallId(), ContainerCommandRequestMessage.toMessage(request, null),
null); type, null);
} }
private GroupInfoRequest createGroupInfoRequest( private GroupInfoRequest createGroupInfoRequest(

View File

@ -80,7 +80,7 @@ public static void writeData(File chunkFile, ChunkInfo chunkInfo,
ByteBuffer data, VolumeIOStats volumeIOStats, boolean sync) ByteBuffer data, VolumeIOStats volumeIOStats, boolean sync)
throws StorageContainerException, ExecutionException, throws StorageContainerException, ExecutionException,
InterruptedException, NoSuchAlgorithmException { InterruptedException, NoSuchAlgorithmException {
int bufferSize = data.capacity(); final int bufferSize = data.remaining();
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
if (bufferSize != chunkInfo.getLen()) { if (bufferSize != chunkInfo.getLen()) {
String err = String.format("data array does not match the length " + String err = String.format("data array does not match the length " +