From 1d279304079cb898e84c8f37ec40fb0e5cfb92ae Mon Sep 17 00:00:00 2001 From: Istvan Fajth Date: Wed, 9 Oct 2019 07:58:47 +0200 Subject: [PATCH] HDDS-2233 - Remove ByteStringHelper and refactor the code to the place where it used (#1596) --- .../hadoop/hdds/scm/XceiverClientManager.java | 7 ++ .../hdds/scm/storage/BlockOutputStream.java | 3 +- .../hadoop/hdds/scm/storage/BufferPool.java | 15 ++++ .../hadoop/hdds/scm/ByteStringConversion.java | 62 +++++++++++++++++ .../hadoop/hdds/scm/ByteStringHelper.java | 69 ------------------- .../container/keyvalue/KeyValueHandler.java | 33 ++++++--- .../keyvalue/helpers/ChunkUtils.java | 34 +-------- .../keyvalue/impl/ChunkManagerDummyImpl.java | 6 +- .../keyvalue/impl/ChunkManagerImpl.java | 60 +++++++--------- .../keyvalue/interfaces/ChunkManager.java | 2 +- .../keyvalue/TestChunkManagerImpl.java | 69 ++++++++++--------- .../client/io/BlockOutputStreamEntryPool.java | 12 +++- .../hadoop/ozone/client/rpc/RpcClient.java | 5 -- .../ozone/container/ContainerTestHelper.java | 11 +-- .../common/impl/TestContainerPersistence.java | 53 +++++++------- 15 files changed, 217 insertions(+), 224 deletions(-) create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java delete mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringHelper.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java index ebed288aa5..8218c77087 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java @@ -36,15 +36,18 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneSecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.nio.ByteBuffer; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE; @@ -307,6 +310,10 @@ public HddsProtos.ReplicationType getType() { return HddsProtos.ReplicationType.STAND_ALONE; } + public Function byteBufferToByteStringConversion(){ + return ByteStringConversion.createByteBufferConversion(conf); + } + /** * Get xceiver client metric. */ diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java index f6d1892355..b15ca3f6c8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java @@ -21,7 +21,6 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.scm.ByteStringHelper; import org.apache.hadoop.hdds.scm.XceiverClientReply; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; @@ -590,7 +589,7 @@ public boolean isClosed() { */ private void writeChunkToContainer(ByteBuffer chunk) throws IOException { int effectiveChunkSize = chunk.remaining(); - ByteString data = ByteStringHelper.getByteString(chunk); + ByteString data = bufferPool.byteStringConversion().apply(chunk); Checksum checksum = new Checksum(checksumType, bytesPerChecksum); ChecksumData checksumData = checksum.computeChecksum(chunk); ChunkInfo chunkInfo = ChunkInfo.newBuilder() diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java index 17788c70a6..6d534579c8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java @@ -19,10 +19,13 @@ package org.apache.hadoop.hdds.scm.storage; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.scm.ByteStringConversion; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.function.Function; /** * This class creates and manages pool of n buffers. @@ -33,12 +36,24 @@ public class BufferPool { private int currentBufferIndex; private final int bufferSize; private final int capacity; + private final Function byteStringConversion; public BufferPool(int bufferSize, int capacity) { + this(bufferSize, capacity, + ByteStringConversion.createByteBufferConversion(null)); + } + + public BufferPool(int bufferSize, int capacity, + Function byteStringConversion){ this.capacity = capacity; this.bufferSize = bufferSize; bufferList = new ArrayList<>(capacity); currentBufferIndex = -1; + this.byteStringConversion = byteStringConversion; + } + + public Function byteStringConversion(){ + return byteStringConversion; } public ByteBuffer getCurrentBuffer() { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java new file mode 100644 index 0000000000..4608df7612 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations; + +import java.nio.ByteBuffer; +import java.util.function.Function; + +/** + * Helper class to create a conversion function from ByteBuffer to ByteString + * based on the property + * {@link OzoneConfigKeys#OZONE_UNSAFEBYTEOPERATIONS_ENABLED} in the + * Ozone configuration. + */ +public final class ByteStringConversion { + private ByteStringConversion(){} // no instantiation. + + /** + * Creates the conversion function to be used to convert ByteBuffers to + * ByteString instances to be used in protobuf messages. + * + * @param config the Ozone configuration + * @return the conversion function defined by + * {@link OzoneConfigKeys#OZONE_UNSAFEBYTEOPERATIONS_ENABLED} + * @see
ByteBuffer
+ */ + public static Function createByteBufferConversion( + Configuration config){ + boolean unsafeEnabled = + config!=null && config.getBoolean( + OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, + OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT); + if (unsafeEnabled) { + return buffer -> UnsafeByteOperations.unsafeWrap(buffer); + } else { + return buffer -> { + ByteString retval = ByteString.copyFrom(buffer); + buffer.flip(); + return retval; + }; + } + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringHelper.java deleted file mode 100644 index ccdf4fac42..0000000000 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringHelper.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdds.scm; - -import com.google.common.base.Preconditions; -import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; -import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations; - -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; -/** - * Helper class to perform Unsafe ByteString conversion from byteBuffer or byte - * array depending on the config "ozone.UnsafeByteOperations.enabled". - */ -public final class ByteStringHelper { - private static final AtomicBoolean INITIALIZED = new AtomicBoolean(); - private static volatile boolean isUnsafeByteOperationsEnabled; - - /** - * There is no need to instantiate this class. - */ - private ByteStringHelper() { - } - - public static void init(boolean isUnsafeByteOperation) { - final boolean set = INITIALIZED.compareAndSet(false, true); - if (set) { - ByteStringHelper.isUnsafeByteOperationsEnabled = - isUnsafeByteOperation; - } else { - // already initialized, check values - Preconditions.checkState(isUnsafeByteOperationsEnabled - == isUnsafeByteOperation); - } - } - - private static ByteString copyFrom(ByteBuffer buffer) { - final ByteString bytes = ByteString.copyFrom(buffer); - // flip the buffer so as to read the data starting from pos 0 again - buffer.flip(); - return bytes; - } - - public static ByteString getByteString(ByteBuffer buffer) { - return isUnsafeByteOperationsEnabled ? - UnsafeByteOperations.unsafeWrap(buffer) : copyFrom(buffer); - } - - public static ByteString getByteString(byte[] bytes) { - return isUnsafeByteOperationsEnabled ? - UnsafeByteOperations.unsafeWrap(bytes) : ByteString.copyFrom(bytes); - } - -} \ No newline at end of file diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 4a59e88a2e..bc418839f2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.StorageUnit; @@ -46,7 +47,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .PutSmallFileRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; -import org.apache.hadoop.hdds.scm.ByteStringHelper; +import org.apache.hadoop.hdds.scm.ByteStringConversion; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; @@ -102,6 +103,7 @@ public class KeyValueHandler extends Handler { private final ChunkManager chunkManager; private final VolumeChoosingPolicy volumeChoosingPolicy; private final long maxContainerSize; + private final Function byteBufferToByteString; // A lock that is held during container creation. private final AutoCloseableLock containerCreationLock; @@ -125,10 +127,8 @@ public KeyValueHandler(Configuration config, StateContext context, // this handler lock is used for synchronizing createContainer Requests, // so using a fair lock here. containerCreationLock = new AutoCloseableLock(new ReentrantLock(true)); - boolean isUnsafeByteOperationsEnabled = conf.getBoolean( - OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, - OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT); - ByteStringHelper.init(isUnsafeByteOperationsEnabled); + byteBufferToByteString = + ByteStringConversion.createByteBufferConversion(conf); } @VisibleForTesting @@ -547,7 +547,7 @@ ContainerCommandResponseProto handleReadChunk( } // The container can become unhealthy after the lock is released. - // The operation will likely fail/timeout in that happens. + // The operation will likely fail/timeout if that happens. try { checkContainerIsHealthy(kvContainer); } catch (StorageContainerException sce) { @@ -555,7 +555,7 @@ ContainerCommandResponseProto handleReadChunk( } ChunkInfo chunkInfo; - byte[] data; + ByteBuffer data; try { BlockID blockID = BlockID.getFromProtobuf( request.getReadChunk().getBlockID()); @@ -569,7 +569,7 @@ ContainerCommandResponseProto handleReadChunk( data = chunkManager .readChunk(kvContainer, blockID, chunkInfo, dispatcherContext); - metrics.incContainerBytesStats(Type.ReadChunk, data.length); + metrics.incContainerBytesStats(Type.ReadChunk, chunkInfo.getLen()); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); } catch (IOException ex) { @@ -578,7 +578,18 @@ ContainerCommandResponseProto handleReadChunk( request); } - return ChunkUtils.getReadChunkResponse(request, data, chunkInfo); + Preconditions.checkNotNull(data, "Chunk data is null"); + + ContainerProtos.ReadChunkResponseProto.Builder response = + ContainerProtos.ReadChunkResponseProto.newBuilder(); + response.setChunkData(chunkInfo.getProtoBufMessage()); + response.setData(byteBufferToByteString.apply(data)); + response.setBlockID(request.getReadChunk().getBlockID()); + + ContainerCommandResponseProto.Builder builder = + ContainerUtils.getSuccessResponseBuilder(request); + builder.setReadChunk(response); + return builder.build(); } /** @@ -800,9 +811,9 @@ ContainerCommandResponseProto handleGetSmallFile( for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) { // if the block is committed, all chunks must have been committed. // Tmp chunk files won't exist here. - byte[] data = chunkManager.readChunk(kvContainer, blockID, + ByteBuffer data = chunkManager.readChunk(kvContainer, blockID, ChunkInfo.getFromProtoBuf(chunk), dispatcherContext); - ByteString current = ByteString.copyFrom(data); + ByteString current = byteBufferToByteString.apply(data); dataBuf = dataBuf.concat(current); chunkInfo = chunk; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java index 91fc25b884..8ca59b5914 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java @@ -24,9 +24,6 @@ .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ReadChunkResponseProto; -import org.apache.hadoop.hdds.scm.ByteStringHelper; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; import org.apache.hadoop.io.IOUtils; @@ -142,8 +139,7 @@ public static void writeData(File chunkFile, ChunkInfo chunkInfo, * @return ByteBuffer */ public static ByteBuffer readData(File chunkFile, ChunkInfo data, - VolumeIOStats volumeIOStats) throws StorageContainerException, - ExecutionException, InterruptedException { + VolumeIOStats volumeIOStats) throws StorageContainerException { Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class); if (!chunkFile.exists()) { @@ -168,6 +164,7 @@ public static ByteBuffer readData(File chunkFile, ChunkInfo data, try (FileLock ignored = file.lock(offset, len, true)) { file.read(buf, offset); + buf.flip(); } // Increment volumeIO stats here. @@ -287,33 +284,6 @@ public static ContainerCommandResponseProto getChunkResponseSuccess( return ContainerUtils.getSuccessResponse(msg); } - /** - * Gets a response to the read chunk calls. - * - * @param msg - Msg - * @param data - Data - * @param info - Info - * @return Response. - */ - public static ContainerCommandResponseProto getReadChunkResponse( - ContainerCommandRequestProto msg, byte[] data, ChunkInfo info) { - Preconditions.checkNotNull(msg); - Preconditions.checkNotNull(data, "Chunk data is null"); - Preconditions.checkNotNull(info, "Chunk Info is null"); - - ReadChunkResponseProto.Builder response = - ReadChunkResponseProto.newBuilder(); - response.setChunkData(info.getProtoBufMessage()); - response.setData( - ByteStringHelper.getByteString(data)); - response.setBlockID(msg.getReadChunk().getBlockID()); - - ContainerCommandResponseProto.Builder builder = - ContainerUtils.getSuccessResponseBuilder(msg); - builder.setReadChunk(response); - return builder.build(); - } - @VisibleForTesting static T processFileExclusively( Path path, CheckedSupplier op diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDummyImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDummyImpl.java index 9d63c16603..fa9e205786 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDummyImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDummyImpl.java @@ -120,8 +120,8 @@ public void writeChunk(Container container, BlockID blockID, ChunkInfo info, * TODO: Explore if we need to do that for ozone. */ @Override - public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info, - DispatcherContext dispatcherContext) { + public ByteBuffer readChunk(Container container, BlockID blockID, + ChunkInfo info, DispatcherContext dispatcherContext) { long readStartTime = Time.monotonicNow(); @@ -138,7 +138,7 @@ public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info, volumeIOStats.incReadOpCount(); volumeIOStats.incReadBytes(info.getLen()); - return data.array(); + return data; } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java index 3418a64796..e22841eec8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java @@ -195,44 +195,34 @@ protected void updateContainerWriteStats(Container container, ChunkInfo info, * TODO: Right now we do not support partial reads and writes of chunks. * TODO: Explore if we need to do that for ozone. */ - public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info, - DispatcherContext dispatcherContext) throws StorageContainerException { - try { - KeyValueContainerData containerData = (KeyValueContainerData) container - .getContainerData(); - ByteBuffer data; - HddsVolume volume = containerData.getVolume(); - VolumeIOStats volumeIOStats = volume.getVolumeIOStats(); + public ByteBuffer readChunk(Container container, BlockID blockID, + ChunkInfo info, DispatcherContext dispatcherContext) + throws StorageContainerException { + KeyValueContainerData containerData = (KeyValueContainerData) container + .getContainerData(); + ByteBuffer data; + HddsVolume volume = containerData.getVolume(); + VolumeIOStats volumeIOStats = volume.getVolumeIOStats(); - // Checking here, which layout version the container is, and reading - // the chunk file in that format. - // In version1, we verify checksum if it is available and return data - // of the chunk file. - if (containerData.getLayOutVersion() == ChunkLayOutVersion - .getLatestVersion().getVersion()) { - File chunkFile = ChunkUtils.getChunkFile(containerData, info); + // Checking here, which layout version the container is, and reading + // the chunk file in that format. + // In version1, we verify checksum if it is available and return data + // of the chunk file. + if (containerData.getLayOutVersion() == ChunkLayOutVersion + .getLatestVersion().getVersion()) { + File chunkFile = ChunkUtils.getChunkFile(containerData, info); - // In case the chunk file does not exist but tmp chunk file exist, - // read from tmp chunk file if readFromTmpFile is set to true - if (!chunkFile.exists() && dispatcherContext != null - && dispatcherContext.isReadFromTmpFile()) { - chunkFile = getTmpChunkFile(chunkFile, dispatcherContext); - } - data = ChunkUtils.readData(chunkFile, info, volumeIOStats); - containerData.incrReadCount(); - long length = chunkFile.length(); - containerData.incrReadBytes(length); - return data.array(); + // In case the chunk file does not exist but tmp chunk file exist, + // read from tmp chunk file if readFromTmpFile is set to true + if (!chunkFile.exists() && dispatcherContext != null + && dispatcherContext.isReadFromTmpFile()) { + chunkFile = getTmpChunkFile(chunkFile, dispatcherContext); } - } catch (ExecutionException ex) { - LOG.error("read data failed. error: {}", ex); - throw new StorageContainerException("Internal error: ", - ex, CONTAINER_INTERNAL_ERROR); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("read data failed. error: {}", e); - throw new StorageContainerException("Internal error: ", - e, CONTAINER_INTERNAL_ERROR); + data = ChunkUtils.readData(chunkFile, info, volumeIOStats); + containerData.incrReadCount(); + long length = chunkFile.length(); + containerData.incrReadBytes(length); + return data; } return null; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java index 5a6898f558..5adb6415ec 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java @@ -59,7 +59,7 @@ void writeChunk(Container container, BlockID blockID, ChunkInfo info, * TODO: Right now we do not support partial reads and writes of chunks. * TODO: Explore if we need to do that for ozone. */ - byte[] readChunk(Container container, BlockID blockID, ChunkInfo info, + ByteBuffer readChunk(Container container, BlockID blockID, ChunkInfo info, DispatcherContext dispatcherContext) throws StorageContainerException; /** diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java index cf9ea891eb..84ab56da86 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java @@ -41,7 +41,6 @@ import java.io.File; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.UUID; import static java.nio.charset.StandardCharsets.UTF_8; @@ -65,7 +64,7 @@ public class TestChunkManagerImpl { private BlockID blockID; private ChunkManagerImpl chunkManager; private ChunkInfo chunkInfo; - private byte[] data; + private ByteBuffer data; @Rule public TemporaryFolder folder = new TemporaryFolder(); @@ -92,11 +91,11 @@ public void setUp() throws Exception { keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId); - data = "testing write chunks".getBytes(UTF_8); + data = ByteBuffer.wrap("testing write chunks".getBytes(UTF_8)); // Creating BlockData blockID = new BlockID(1L, 1L); chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID - .getLocalID(), 0), 0, data.length); + .getLocalID(), 0), 0, data.capacity()); // Create a ChunkManager object. chunkManager = new ChunkManagerImpl(true); @@ -118,8 +117,8 @@ public void testWriteChunkStageWriteAndCommit() throws Exception { // As no chunks are written to the volume writeBytes should be 0 checkWriteIOStats(0, 0); - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), new DispatcherContext.Builder() + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, + new DispatcherContext.Builder() .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA).build()); // Now a chunk file is being written with Stage WRITE_DATA, so it should // create a temporary chunk file. @@ -137,13 +136,13 @@ public void testWriteChunkStageWriteAndCommit() throws Exception { // As chunk write stage is WRITE_DATA, temp chunk file will be created. assertTrue(tempChunkFile.exists()); - checkWriteIOStats(data.length, 1); + checkWriteIOStats(data.capacity(), 1); - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), new DispatcherContext.Builder() + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, + new DispatcherContext.Builder() .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA).build()); - checkWriteIOStats(data.length, 1); + checkWriteIOStats(data.capacity(), 1); // Old temp file should have been renamed to chunk file. assertTrue(chunksPath.listFiles().length == 1); @@ -160,8 +159,8 @@ public void testWriteChunkIncorrectLength() throws Exception { long randomLength = 200L; chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID .getLocalID(), 0), 0, randomLength); - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), getDispatcherContext()); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, + getDispatcherContext()); fail("testWriteChunkIncorrectLength failed"); } catch (StorageContainerException ex) { // As we got an exception, writeBytes should be 0. @@ -181,35 +180,36 @@ public void testWriteChunkStageCombinedData() throws Exception { // Initially chunks folder should be empty. assertTrue(chunksPath.listFiles().length == 0); checkWriteIOStats(0, 0); - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), getDispatcherContext()); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, + getDispatcherContext()); // Now a chunk file is being written with Stage COMBINED_DATA, so it should // create a chunk file. assertTrue(chunksPath.listFiles().length == 1); File chunkFile = ChunkUtils.getChunkFile(keyValueContainerData, chunkInfo); assertTrue(chunkFile.exists()); - checkWriteIOStats(data.length, 1); + checkWriteIOStats(data.capacity(), 1); } @Test public void testReadChunk() throws Exception { checkWriteIOStats(0, 0); - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), getDispatcherContext()); - checkWriteIOStats(data.length, 1); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, + getDispatcherContext()); + checkWriteIOStats(data.capacity(), 1); checkReadIOStats(0, 0); - byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID, + ByteBuffer expectedData = chunkManager.readChunk(keyValueContainer, blockID, chunkInfo, getDispatcherContext()); - assertEquals(expectedData.length, data.length); - assertTrue(Arrays.equals(expectedData, data)); - checkReadIOStats(data.length, 1); + assertEquals(expectedData.limit()-expectedData.position(), + chunkInfo.getLen()); + assertTrue(expectedData.rewind().equals(data.rewind())); + checkReadIOStats(expectedData.capacity(), 1); } @Test public void testDeleteChunk() throws Exception { File chunksPath = new File(keyValueContainerData.getChunksPath()); - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), getDispatcherContext()); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, + getDispatcherContext()); assertTrue(chunksPath.listFiles().length == 1); chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo); assertTrue(chunksPath.listFiles().length == 0); @@ -218,8 +218,8 @@ public void testDeleteChunk() throws Exception { @Test public void testDeleteChunkUnsupportedRequest() throws Exception { try { - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), getDispatcherContext()); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, + getDispatcherContext()); long randomLength = 200L; chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID .getLocalID(), 0), 0, randomLength); @@ -235,8 +235,8 @@ public void testDeleteChunkUnsupportedRequest() throws Exception { public void testReadChunkFileNotExists() throws Exception { try { // trying to read a chunk, where chunk file does not exist - byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID, - chunkInfo, getDispatcherContext()); + ByteBuffer expectedData = chunkManager.readChunk(keyValueContainer, + blockID, chunkInfo, getDispatcherContext()); fail("testReadChunkFileNotExists failed"); } catch (StorageContainerException ex) { GenericTestUtils.assertExceptionContains("Unable to find the chunk " + @@ -249,20 +249,21 @@ public void testReadChunkFileNotExists() throws Exception { public void testWriteAndReadChunkMultipleTimes() throws Exception { for (int i=0; i<100; i++) { chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID - .getLocalID(), i), 0, data.length); - chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), getDispatcherContext()); + .getLocalID(), i), 0, data.capacity()); + chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data, + getDispatcherContext()); + data.rewind(); } - checkWriteIOStats(data.length*100, 100); + checkWriteIOStats(data.capacity()*100, 100); assertTrue(hddsVolume.getVolumeIOStats().getWriteTime() > 0); for (int i=0; i<100; i++) { chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID - .getLocalID(), i), 0, data.length); + .getLocalID(), i), 0, data.capacity()); chunkManager.readChunk(keyValueContainer, blockID, chunkInfo, getDispatcherContext()); } - checkReadIOStats(data.length*100, 100); + checkReadIOStats(data.capacity()*100, 100); assertTrue(hddsVolume.getVolumeIOStats().getReadTime() > 0); } diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java index e707e4fea0..045997fd05 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.ozone.client.io; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -100,10 +101,17 @@ public BlockOutputStreamEntryPool(OzoneManagerProtocol omClient, Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0); Preconditions.checkState(blockSize % streamBufferMaxSize == 0); this.bufferPool = - new BufferPool(chunkSize, (int) streamBufferMaxSize / chunkSize); + new BufferPool(chunkSize, (int) streamBufferMaxSize / chunkSize, + xceiverClientManager.byteBufferToByteStringConversion()); } - public BlockOutputStreamEntryPool() { + /** + * A constructor for testing purpose only. + * + * @see KeyOutputStream#KeyOutputStream() + */ + @VisibleForTesting + BlockOutputStreamEntryPool() { streamEntries = new ArrayList<>(); omClient = null; keyArgs = null; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index ebe4477955..d0dd124171 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hdds.protocol.StorageType; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ChecksumType; -import org.apache.hadoop.hdds.scm.ByteStringHelper; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.tracing.TracingUtil; import org.apache.hadoop.io.IOUtils; @@ -219,10 +218,6 @@ public RpcClient(Configuration conf, String omServiceId) throws IOException { OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL, OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL_DEFAULT); dtService = getOMProxyProvider().getCurrentProxyDelegationToken(); - boolean isUnsafeByteOperationsEnabled = conf.getBoolean( - OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED, - OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT); - ByteStringHelper.init(isUnsafeByteOperationsEnabled); topologyAwareReadEnabled = conf.getBoolean( OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 313a3c094f..395bda0d5d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.ServerSocket; +import java.nio.ByteBuffer; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; @@ -193,10 +194,10 @@ public static ChunkInfo getChunk(long keyID, int seqNo, long offset, * @param len - Number of bytes. * @return byte array with valid data. */ - public static byte[] getData(int len) { + public static ByteBuffer getData(int len) { byte[] data = new byte[len]; r.nextBytes(data); - return data; + return ByteBuffer.wrap(data); } /** @@ -206,7 +207,7 @@ public static byte[] getData(int len) { * @param data - data array * @throws NoSuchAlgorithmException */ - public static void setDataChecksum(ChunkInfo info, byte[] data) + public static void setDataChecksum(ChunkInfo info, ByteBuffer data) throws OzoneChecksumException { Checksum checksum = new Checksum(); info.setChecksumData(checksum.computeChecksum(data)); @@ -232,7 +233,7 @@ public static ContainerCommandRequestProto getWriteChunkRequest( writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf()); - byte[] data = getData(datalen); + ByteBuffer data = getData(datalen); ChunkInfo info = getChunk(blockID.getLocalID(), 0, 0, datalen); setDataChecksum(info, data); @@ -262,7 +263,7 @@ public static ContainerCommandRequestProto getWriteSmallFileRequest( throws Exception { ContainerProtos.PutSmallFileRequestProto.Builder smallFileRequest = ContainerProtos.PutSmallFileRequestProto.newBuilder(); - byte[] data = getData(dataLen); + ByteBuffer data = getData(dataLen); ChunkInfo info = getChunk(blockID.getLocalID(), 0, 0, dataLen); setDataChecksum(info, data); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index c0415e81ff..ed482093df 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -70,7 +70,6 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -349,11 +348,11 @@ private ChunkInfo writeChunkHelper(BlockID blockID) throws IOException { } ChunkInfo info = getChunk( blockID.getLocalID(), 0, 0, datalen); - byte[] data = getData(datalen); + ByteBuffer data = getData(datalen); setDataChecksum(info, data); commitBytesBefore = container.getContainerData() .getVolume().getCommittedBytes(); - chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + chunkManager.writeChunk(container, blockID, info, data, getDispatcherContext()); commitBytesAfter = container.getContainerData() .getVolume().getCommittedBytes(); @@ -397,9 +396,9 @@ public void testWritReadManyChunks() throws IOException { Map fileHashMap = new HashMap<>(); for (int x = 0; x < chunkCount; x++) { ChunkInfo info = getChunk(blockID.getLocalID(), x, 0, datalen); - byte[] data = getData(datalen); + ByteBuffer data = getData(datalen); setDataChecksum(info, data); - chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + chunkManager.writeChunk(container, blockID, info, data, getDispatcherContext()); String fileName = String.format("%s.data.%d", blockID.getLocalID(), x); fileHashMap.put(fileName, info); @@ -431,7 +430,7 @@ public void testWritReadManyChunks() throws IOException { for (int x = 0; x < chunkCount; x++) { String fileName = String.format("%s.data.%d", blockID.getLocalID(), x); ChunkInfo info = fileHashMap.get(fileName); - byte[] data = chunkManager + ByteBuffer data = chunkManager .readChunk(container, blockID, info, getDispatcherContext()); ChecksumData checksumData = checksum.computeChecksum(data); Assert.assertEquals(info.getChecksumData(), checksumData); @@ -456,21 +455,22 @@ public void testPartialRead() throws Exception { BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID); ChunkInfo info = getChunk( blockID.getLocalID(), 0, 0, datalen); - byte[] data = getData(datalen); + ByteBuffer data = getData(datalen); setDataChecksum(info, data); - chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + chunkManager.writeChunk(container, blockID, info, data, getDispatcherContext()); - byte[] readData = chunkManager + ByteBuffer readData = chunkManager .readChunk(container, blockID, info, getDispatcherContext()); - assertTrue(Arrays.equals(data, readData)); + assertTrue(data.rewind().equals(readData.rewind())); ChunkInfo info2 = getChunk(blockID.getLocalID(), 0, start, length); - byte[] readData2 = chunkManager + ByteBuffer readData2 = chunkManager .readChunk(container, blockID, info2, getDispatcherContext()); - assertEquals(length, readData2.length); - assertTrue(Arrays.equals( - Arrays.copyOfRange(data, start, start + length), readData2)); + assertEquals(length, info2.getLen()); + boolean equals = + data.position(start).limit(start+length).equals(readData2.rewind()); + assertTrue(equals); } /** @@ -491,15 +491,17 @@ public void testOverWrite() throws IOException, BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID); ChunkInfo info = getChunk( blockID.getLocalID(), 0, 0, datalen); - byte[] data = getData(datalen); + ByteBuffer data = getData(datalen); setDataChecksum(info, data); - chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + chunkManager.writeChunk(container, blockID, info, data, getDispatcherContext()); - chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + data.rewind(); + chunkManager.writeChunk(container, blockID, info, data, getDispatcherContext()); + data.rewind(); // With the overwrite flag it should work now. info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true"); - chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + chunkManager.writeChunk(container, blockID, info, data, getDispatcherContext()); long bytesUsed = container.getContainerData().getBytesUsed(); Assert.assertEquals(datalen, bytesUsed); @@ -531,17 +533,18 @@ public void testMultipleWriteSingleRead() throws IOException, long offset = x * datalen; ChunkInfo info = getChunk( blockID.getLocalID(), 0, offset, datalen); - byte[] data = getData(datalen); + ByteBuffer data = getData(datalen); oldSha.update(data); + data.rewind(); setDataChecksum(info, data); - chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + chunkManager.writeChunk(container, blockID, info, data, getDispatcherContext()); } // Request to read the whole data in a single go. ChunkInfo largeChunk = getChunk(blockID.getLocalID(), 0, 0, datalen * chunkCount); - byte[] newdata = + ByteBuffer newdata = chunkManager.readChunk(container, blockID, largeChunk, getDispatcherContext()); MessageDigest newSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH); @@ -566,9 +569,9 @@ public void testDeleteChunk() throws IOException, BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID); ChunkInfo info = getChunk( blockID.getLocalID(), 0, 0, datalen); - byte[] data = getData(datalen); + ByteBuffer data = getData(datalen); setDataChecksum(info, data); - chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + chunkManager.writeChunk(container, blockID, info, data, getDispatcherContext()); chunkManager.deleteChunk(container, blockID, info); exception.expect(StorageContainerException.class); @@ -681,9 +684,9 @@ public void testPutBlockWithLotsOfChunks() throws IOException, for (int x = 1; x < chunkCount; x++) { // with holes in the front (before x * datalen) info = getChunk(blockID.getLocalID(), x, x * datalen, datalen); - byte[] data = getData(datalen); + ByteBuffer data = getData(datalen); setDataChecksum(info, data); - chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), + chunkManager.writeChunk(container, blockID, info, data, getDispatcherContext()); totalSize += datalen; chunkList.add(info);