HDDS-2233 - Remove ByteStringHelper and refactor the code to the place where it used (#1596)
This commit is contained in:
parent
87d9f3668c
commit
1d27930407
@ -36,15 +36,18 @@ import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
|
|||||||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
import org.apache.hadoop.ozone.OzoneSecurityUtil;
|
import org.apache.hadoop.ozone.OzoneSecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.security.cert.CertificateException;
|
import java.security.cert.CertificateException;
|
||||||
import java.security.cert.X509Certificate;
|
import java.security.cert.X509Certificate;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||||
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
|
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
|
||||||
@ -307,6 +310,10 @@ public class XceiverClientManager implements Closeable {
|
|||||||
return HddsProtos.ReplicationType.STAND_ALONE;
|
return HddsProtos.ReplicationType.STAND_ALONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Function<ByteBuffer, ByteString> byteBufferToByteStringConversion(){
|
||||||
|
return ByteStringConversion.createByteBufferConversion(conf);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get xceiver client metric.
|
* Get xceiver client metric.
|
||||||
*/
|
*/
|
||||||
|
@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
|
|||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdds.scm.ByteStringHelper;
|
|
||||||
import org.apache.hadoop.hdds.scm.XceiverClientReply;
|
import org.apache.hadoop.hdds.scm.XceiverClientReply;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
|
||||||
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
|
||||||
@ -590,7 +589,7 @@ public class BlockOutputStream extends OutputStream {
|
|||||||
*/
|
*/
|
||||||
private void writeChunkToContainer(ByteBuffer chunk) throws IOException {
|
private void writeChunkToContainer(ByteBuffer chunk) throws IOException {
|
||||||
int effectiveChunkSize = chunk.remaining();
|
int effectiveChunkSize = chunk.remaining();
|
||||||
ByteString data = ByteStringHelper.getByteString(chunk);
|
ByteString data = bufferPool.byteStringConversion().apply(chunk);
|
||||||
Checksum checksum = new Checksum(checksumType, bytesPerChecksum);
|
Checksum checksum = new Checksum(checksumType, bytesPerChecksum);
|
||||||
ChecksumData checksumData = checksum.computeChecksum(chunk);
|
ChecksumData checksumData = checksum.computeChecksum(chunk);
|
||||||
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
|
ChunkInfo chunkInfo = ChunkInfo.newBuilder()
|
||||||
|
@ -19,10 +19,13 @@
|
|||||||
package org.apache.hadoop.hdds.scm.storage;
|
package org.apache.hadoop.hdds.scm.storage;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.hdds.scm.ByteStringConversion;
|
||||||
|
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class creates and manages pool of n buffers.
|
* This class creates and manages pool of n buffers.
|
||||||
@ -33,12 +36,24 @@ public class BufferPool {
|
|||||||
private int currentBufferIndex;
|
private int currentBufferIndex;
|
||||||
private final int bufferSize;
|
private final int bufferSize;
|
||||||
private final int capacity;
|
private final int capacity;
|
||||||
|
private final Function<ByteBuffer, ByteString> byteStringConversion;
|
||||||
|
|
||||||
public BufferPool(int bufferSize, int capacity) {
|
public BufferPool(int bufferSize, int capacity) {
|
||||||
|
this(bufferSize, capacity,
|
||||||
|
ByteStringConversion.createByteBufferConversion(null));
|
||||||
|
}
|
||||||
|
|
||||||
|
public BufferPool(int bufferSize, int capacity,
|
||||||
|
Function<ByteBuffer, ByteString> byteStringConversion){
|
||||||
this.capacity = capacity;
|
this.capacity = capacity;
|
||||||
this.bufferSize = bufferSize;
|
this.bufferSize = bufferSize;
|
||||||
bufferList = new ArrayList<>(capacity);
|
bufferList = new ArrayList<>(capacity);
|
||||||
currentBufferIndex = -1;
|
currentBufferIndex = -1;
|
||||||
|
this.byteStringConversion = byteStringConversion;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Function<ByteBuffer, ByteString> byteStringConversion(){
|
||||||
|
return byteStringConversion;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ByteBuffer getCurrentBuffer() {
|
public ByteBuffer getCurrentBuffer() {
|
||||||
|
@ -0,0 +1,62 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdds.scm;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||||
|
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
|
||||||
|
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper class to create a conversion function from ByteBuffer to ByteString
|
||||||
|
* based on the property
|
||||||
|
* {@link OzoneConfigKeys#OZONE_UNSAFEBYTEOPERATIONS_ENABLED} in the
|
||||||
|
* Ozone configuration.
|
||||||
|
*/
|
||||||
|
public final class ByteStringConversion {
|
||||||
|
private ByteStringConversion(){} // no instantiation.
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates the conversion function to be used to convert ByteBuffers to
|
||||||
|
* ByteString instances to be used in protobuf messages.
|
||||||
|
*
|
||||||
|
* @param config the Ozone configuration
|
||||||
|
* @return the conversion function defined by
|
||||||
|
* {@link OzoneConfigKeys#OZONE_UNSAFEBYTEOPERATIONS_ENABLED}
|
||||||
|
* @see <pre>ByteBuffer</pre>
|
||||||
|
*/
|
||||||
|
public static Function<ByteBuffer, ByteString> createByteBufferConversion(
|
||||||
|
Configuration config){
|
||||||
|
boolean unsafeEnabled =
|
||||||
|
config!=null && config.getBoolean(
|
||||||
|
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
|
||||||
|
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT);
|
||||||
|
if (unsafeEnabled) {
|
||||||
|
return buffer -> UnsafeByteOperations.unsafeWrap(buffer);
|
||||||
|
} else {
|
||||||
|
return buffer -> {
|
||||||
|
ByteString retval = ByteString.copyFrom(buffer);
|
||||||
|
buffer.flip();
|
||||||
|
return retval;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,69 +0,0 @@
|
|||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hdds.scm;
|
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
|
|
||||||
import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
/**
|
|
||||||
* Helper class to perform Unsafe ByteString conversion from byteBuffer or byte
|
|
||||||
* array depending on the config "ozone.UnsafeByteOperations.enabled".
|
|
||||||
*/
|
|
||||||
public final class ByteStringHelper {
|
|
||||||
private static final AtomicBoolean INITIALIZED = new AtomicBoolean();
|
|
||||||
private static volatile boolean isUnsafeByteOperationsEnabled;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* There is no need to instantiate this class.
|
|
||||||
*/
|
|
||||||
private ByteStringHelper() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void init(boolean isUnsafeByteOperation) {
|
|
||||||
final boolean set = INITIALIZED.compareAndSet(false, true);
|
|
||||||
if (set) {
|
|
||||||
ByteStringHelper.isUnsafeByteOperationsEnabled =
|
|
||||||
isUnsafeByteOperation;
|
|
||||||
} else {
|
|
||||||
// already initialized, check values
|
|
||||||
Preconditions.checkState(isUnsafeByteOperationsEnabled
|
|
||||||
== isUnsafeByteOperation);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static ByteString copyFrom(ByteBuffer buffer) {
|
|
||||||
final ByteString bytes = ByteString.copyFrom(buffer);
|
|
||||||
// flip the buffer so as to read the data starting from pos 0 again
|
|
||||||
buffer.flip();
|
|
||||||
return bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static ByteString getByteString(ByteBuffer buffer) {
|
|
||||||
return isUnsafeByteOperationsEnabled ?
|
|
||||||
UnsafeByteOperations.unsafeWrap(buffer) : copyFrom(buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static ByteString getByteString(byte[] bytes) {
|
|
||||||
return isUnsafeByteOperationsEnabled ?
|
|
||||||
UnsafeByteOperations.unsafeWrap(bytes) : ByteString.copyFrom(bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -27,6 +27,7 @@ import java.util.LinkedList;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.StorageUnit;
|
import org.apache.hadoop.conf.StorageUnit;
|
||||||
@ -46,7 +47,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
|
|||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
.PutSmallFileRequestProto;
|
.PutSmallFileRequestProto;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
|
||||||
import org.apache.hadoop.hdds.scm.ByteStringHelper;
|
import org.apache.hadoop.hdds.scm.ByteStringConversion;
|
||||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||||
.StorageContainerException;
|
.StorageContainerException;
|
||||||
@ -102,6 +103,7 @@ public class KeyValueHandler extends Handler {
|
|||||||
private final ChunkManager chunkManager;
|
private final ChunkManager chunkManager;
|
||||||
private final VolumeChoosingPolicy volumeChoosingPolicy;
|
private final VolumeChoosingPolicy volumeChoosingPolicy;
|
||||||
private final long maxContainerSize;
|
private final long maxContainerSize;
|
||||||
|
private final Function<ByteBuffer, ByteString> byteBufferToByteString;
|
||||||
|
|
||||||
// A lock that is held during container creation.
|
// A lock that is held during container creation.
|
||||||
private final AutoCloseableLock containerCreationLock;
|
private final AutoCloseableLock containerCreationLock;
|
||||||
@ -125,10 +127,8 @@ public class KeyValueHandler extends Handler {
|
|||||||
// this handler lock is used for synchronizing createContainer Requests,
|
// this handler lock is used for synchronizing createContainer Requests,
|
||||||
// so using a fair lock here.
|
// so using a fair lock here.
|
||||||
containerCreationLock = new AutoCloseableLock(new ReentrantLock(true));
|
containerCreationLock = new AutoCloseableLock(new ReentrantLock(true));
|
||||||
boolean isUnsafeByteOperationsEnabled = conf.getBoolean(
|
byteBufferToByteString =
|
||||||
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
|
ByteStringConversion.createByteBufferConversion(conf);
|
||||||
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT);
|
|
||||||
ByteStringHelper.init(isUnsafeByteOperationsEnabled);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
@ -547,7 +547,7 @@ public class KeyValueHandler extends Handler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// The container can become unhealthy after the lock is released.
|
// The container can become unhealthy after the lock is released.
|
||||||
// The operation will likely fail/timeout in that happens.
|
// The operation will likely fail/timeout if that happens.
|
||||||
try {
|
try {
|
||||||
checkContainerIsHealthy(kvContainer);
|
checkContainerIsHealthy(kvContainer);
|
||||||
} catch (StorageContainerException sce) {
|
} catch (StorageContainerException sce) {
|
||||||
@ -555,7 +555,7 @@ public class KeyValueHandler extends Handler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ChunkInfo chunkInfo;
|
ChunkInfo chunkInfo;
|
||||||
byte[] data;
|
ByteBuffer data;
|
||||||
try {
|
try {
|
||||||
BlockID blockID = BlockID.getFromProtobuf(
|
BlockID blockID = BlockID.getFromProtobuf(
|
||||||
request.getReadChunk().getBlockID());
|
request.getReadChunk().getBlockID());
|
||||||
@ -569,7 +569,7 @@ public class KeyValueHandler extends Handler {
|
|||||||
|
|
||||||
data = chunkManager
|
data = chunkManager
|
||||||
.readChunk(kvContainer, blockID, chunkInfo, dispatcherContext);
|
.readChunk(kvContainer, blockID, chunkInfo, dispatcherContext);
|
||||||
metrics.incContainerBytesStats(Type.ReadChunk, data.length);
|
metrics.incContainerBytesStats(Type.ReadChunk, chunkInfo.getLen());
|
||||||
} catch (StorageContainerException ex) {
|
} catch (StorageContainerException ex) {
|
||||||
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
return ContainerUtils.logAndReturnError(LOG, ex, request);
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
@ -578,7 +578,18 @@ public class KeyValueHandler extends Handler {
|
|||||||
request);
|
request);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ChunkUtils.getReadChunkResponse(request, data, chunkInfo);
|
Preconditions.checkNotNull(data, "Chunk data is null");
|
||||||
|
|
||||||
|
ContainerProtos.ReadChunkResponseProto.Builder response =
|
||||||
|
ContainerProtos.ReadChunkResponseProto.newBuilder();
|
||||||
|
response.setChunkData(chunkInfo.getProtoBufMessage());
|
||||||
|
response.setData(byteBufferToByteString.apply(data));
|
||||||
|
response.setBlockID(request.getReadChunk().getBlockID());
|
||||||
|
|
||||||
|
ContainerCommandResponseProto.Builder builder =
|
||||||
|
ContainerUtils.getSuccessResponseBuilder(request);
|
||||||
|
builder.setReadChunk(response);
|
||||||
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -800,9 +811,9 @@ public class KeyValueHandler extends Handler {
|
|||||||
for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) {
|
for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) {
|
||||||
// if the block is committed, all chunks must have been committed.
|
// if the block is committed, all chunks must have been committed.
|
||||||
// Tmp chunk files won't exist here.
|
// Tmp chunk files won't exist here.
|
||||||
byte[] data = chunkManager.readChunk(kvContainer, blockID,
|
ByteBuffer data = chunkManager.readChunk(kvContainer, blockID,
|
||||||
ChunkInfo.getFromProtoBuf(chunk), dispatcherContext);
|
ChunkInfo.getFromProtoBuf(chunk), dispatcherContext);
|
||||||
ByteString current = ByteString.copyFrom(data);
|
ByteString current = byteBufferToByteString.apply(data);
|
||||||
dataBuf = dataBuf.concat(current);
|
dataBuf = dataBuf.concat(current);
|
||||||
chunkInfo = chunk;
|
chunkInfo = chunk;
|
||||||
}
|
}
|
||||||
|
@ -24,9 +24,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|||||||
.ContainerCommandRequestProto;
|
.ContainerCommandRequestProto;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
.ContainerCommandResponseProto;
|
.ContainerCommandResponseProto;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
|
||||||
.ReadChunkResponseProto;
|
|
||||||
import org.apache.hadoop.hdds.scm.ByteStringHelper;
|
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||||
.StorageContainerException;
|
.StorageContainerException;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
@ -142,8 +139,7 @@ public final class ChunkUtils {
|
|||||||
* @return ByteBuffer
|
* @return ByteBuffer
|
||||||
*/
|
*/
|
||||||
public static ByteBuffer readData(File chunkFile, ChunkInfo data,
|
public static ByteBuffer readData(File chunkFile, ChunkInfo data,
|
||||||
VolumeIOStats volumeIOStats) throws StorageContainerException,
|
VolumeIOStats volumeIOStats) throws StorageContainerException {
|
||||||
ExecutionException, InterruptedException {
|
|
||||||
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
|
Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
|
||||||
|
|
||||||
if (!chunkFile.exists()) {
|
if (!chunkFile.exists()) {
|
||||||
@ -168,6 +164,7 @@ public final class ChunkUtils {
|
|||||||
|
|
||||||
try (FileLock ignored = file.lock(offset, len, true)) {
|
try (FileLock ignored = file.lock(offset, len, true)) {
|
||||||
file.read(buf, offset);
|
file.read(buf, offset);
|
||||||
|
buf.flip();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Increment volumeIO stats here.
|
// Increment volumeIO stats here.
|
||||||
@ -287,33 +284,6 @@ public final class ChunkUtils {
|
|||||||
return ContainerUtils.getSuccessResponse(msg);
|
return ContainerUtils.getSuccessResponse(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets a response to the read chunk calls.
|
|
||||||
*
|
|
||||||
* @param msg - Msg
|
|
||||||
* @param data - Data
|
|
||||||
* @param info - Info
|
|
||||||
* @return Response.
|
|
||||||
*/
|
|
||||||
public static ContainerCommandResponseProto getReadChunkResponse(
|
|
||||||
ContainerCommandRequestProto msg, byte[] data, ChunkInfo info) {
|
|
||||||
Preconditions.checkNotNull(msg);
|
|
||||||
Preconditions.checkNotNull(data, "Chunk data is null");
|
|
||||||
Preconditions.checkNotNull(info, "Chunk Info is null");
|
|
||||||
|
|
||||||
ReadChunkResponseProto.Builder response =
|
|
||||||
ReadChunkResponseProto.newBuilder();
|
|
||||||
response.setChunkData(info.getProtoBufMessage());
|
|
||||||
response.setData(
|
|
||||||
ByteStringHelper.getByteString(data));
|
|
||||||
response.setBlockID(msg.getReadChunk().getBlockID());
|
|
||||||
|
|
||||||
ContainerCommandResponseProto.Builder builder =
|
|
||||||
ContainerUtils.getSuccessResponseBuilder(msg);
|
|
||||||
builder.setReadChunk(response);
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static <T, E extends Exception> T processFileExclusively(
|
static <T, E extends Exception> T processFileExclusively(
|
||||||
Path path, CheckedSupplier<T, E> op
|
Path path, CheckedSupplier<T, E> op
|
||||||
|
@ -120,8 +120,8 @@ public class ChunkManagerDummyImpl extends ChunkManagerImpl {
|
|||||||
* TODO: Explore if we need to do that for ozone.
|
* TODO: Explore if we need to do that for ozone.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info,
|
public ByteBuffer readChunk(Container container, BlockID blockID,
|
||||||
DispatcherContext dispatcherContext) {
|
ChunkInfo info, DispatcherContext dispatcherContext) {
|
||||||
|
|
||||||
long readStartTime = Time.monotonicNow();
|
long readStartTime = Time.monotonicNow();
|
||||||
|
|
||||||
@ -138,7 +138,7 @@ public class ChunkManagerDummyImpl extends ChunkManagerImpl {
|
|||||||
volumeIOStats.incReadOpCount();
|
volumeIOStats.incReadOpCount();
|
||||||
volumeIOStats.incReadBytes(info.getLen());
|
volumeIOStats.incReadBytes(info.getLen());
|
||||||
|
|
||||||
return data.array();
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -195,44 +195,34 @@ public class ChunkManagerImpl implements ChunkManager {
|
|||||||
* TODO: Right now we do not support partial reads and writes of chunks.
|
* TODO: Right now we do not support partial reads and writes of chunks.
|
||||||
* TODO: Explore if we need to do that for ozone.
|
* TODO: Explore if we need to do that for ozone.
|
||||||
*/
|
*/
|
||||||
public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info,
|
public ByteBuffer readChunk(Container container, BlockID blockID,
|
||||||
DispatcherContext dispatcherContext) throws StorageContainerException {
|
ChunkInfo info, DispatcherContext dispatcherContext)
|
||||||
try {
|
throws StorageContainerException {
|
||||||
KeyValueContainerData containerData = (KeyValueContainerData) container
|
KeyValueContainerData containerData = (KeyValueContainerData) container
|
||||||
.getContainerData();
|
.getContainerData();
|
||||||
ByteBuffer data;
|
ByteBuffer data;
|
||||||
HddsVolume volume = containerData.getVolume();
|
HddsVolume volume = containerData.getVolume();
|
||||||
VolumeIOStats volumeIOStats = volume.getVolumeIOStats();
|
VolumeIOStats volumeIOStats = volume.getVolumeIOStats();
|
||||||
|
|
||||||
// Checking here, which layout version the container is, and reading
|
// Checking here, which layout version the container is, and reading
|
||||||
// the chunk file in that format.
|
// the chunk file in that format.
|
||||||
// In version1, we verify checksum if it is available and return data
|
// In version1, we verify checksum if it is available and return data
|
||||||
// of the chunk file.
|
// of the chunk file.
|
||||||
if (containerData.getLayOutVersion() == ChunkLayOutVersion
|
if (containerData.getLayOutVersion() == ChunkLayOutVersion
|
||||||
.getLatestVersion().getVersion()) {
|
.getLatestVersion().getVersion()) {
|
||||||
File chunkFile = ChunkUtils.getChunkFile(containerData, info);
|
File chunkFile = ChunkUtils.getChunkFile(containerData, info);
|
||||||
|
|
||||||
// In case the chunk file does not exist but tmp chunk file exist,
|
// In case the chunk file does not exist but tmp chunk file exist,
|
||||||
// read from tmp chunk file if readFromTmpFile is set to true
|
// read from tmp chunk file if readFromTmpFile is set to true
|
||||||
if (!chunkFile.exists() && dispatcherContext != null
|
if (!chunkFile.exists() && dispatcherContext != null
|
||||||
&& dispatcherContext.isReadFromTmpFile()) {
|
&& dispatcherContext.isReadFromTmpFile()) {
|
||||||
chunkFile = getTmpChunkFile(chunkFile, dispatcherContext);
|
chunkFile = getTmpChunkFile(chunkFile, dispatcherContext);
|
||||||
}
|
|
||||||
data = ChunkUtils.readData(chunkFile, info, volumeIOStats);
|
|
||||||
containerData.incrReadCount();
|
|
||||||
long length = chunkFile.length();
|
|
||||||
containerData.incrReadBytes(length);
|
|
||||||
return data.array();
|
|
||||||
}
|
}
|
||||||
} catch (ExecutionException ex) {
|
data = ChunkUtils.readData(chunkFile, info, volumeIOStats);
|
||||||
LOG.error("read data failed. error: {}", ex);
|
containerData.incrReadCount();
|
||||||
throw new StorageContainerException("Internal error: ",
|
long length = chunkFile.length();
|
||||||
ex, CONTAINER_INTERNAL_ERROR);
|
containerData.incrReadBytes(length);
|
||||||
} catch (InterruptedException e) {
|
return data;
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
LOG.error("read data failed. error: {}", e);
|
|
||||||
throw new StorageContainerException("Internal error: ",
|
|
||||||
e, CONTAINER_INTERNAL_ERROR);
|
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -59,7 +59,7 @@ public interface ChunkManager {
|
|||||||
* TODO: Right now we do not support partial reads and writes of chunks.
|
* TODO: Right now we do not support partial reads and writes of chunks.
|
||||||
* TODO: Explore if we need to do that for ozone.
|
* TODO: Explore if we need to do that for ozone.
|
||||||
*/
|
*/
|
||||||
byte[] readChunk(Container container, BlockID blockID, ChunkInfo info,
|
ByteBuffer readChunk(Container container, BlockID blockID, ChunkInfo info,
|
||||||
DispatcherContext dispatcherContext) throws StorageContainerException;
|
DispatcherContext dispatcherContext) throws StorageContainerException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -41,7 +41,6 @@ import org.mockito.Mockito;
|
|||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||||
@ -65,7 +64,7 @@ public class TestChunkManagerImpl {
|
|||||||
private BlockID blockID;
|
private BlockID blockID;
|
||||||
private ChunkManagerImpl chunkManager;
|
private ChunkManagerImpl chunkManager;
|
||||||
private ChunkInfo chunkInfo;
|
private ChunkInfo chunkInfo;
|
||||||
private byte[] data;
|
private ByteBuffer data;
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public TemporaryFolder folder = new TemporaryFolder();
|
public TemporaryFolder folder = new TemporaryFolder();
|
||||||
@ -92,11 +91,11 @@ public class TestChunkManagerImpl {
|
|||||||
|
|
||||||
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
|
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
|
||||||
|
|
||||||
data = "testing write chunks".getBytes(UTF_8);
|
data = ByteBuffer.wrap("testing write chunks".getBytes(UTF_8));
|
||||||
// Creating BlockData
|
// Creating BlockData
|
||||||
blockID = new BlockID(1L, 1L);
|
blockID = new BlockID(1L, 1L);
|
||||||
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
|
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
|
||||||
.getLocalID(), 0), 0, data.length);
|
.getLocalID(), 0), 0, data.capacity());
|
||||||
|
|
||||||
// Create a ChunkManager object.
|
// Create a ChunkManager object.
|
||||||
chunkManager = new ChunkManagerImpl(true);
|
chunkManager = new ChunkManagerImpl(true);
|
||||||
@ -118,8 +117,8 @@ public class TestChunkManagerImpl {
|
|||||||
|
|
||||||
// As no chunks are written to the volume writeBytes should be 0
|
// As no chunks are written to the volume writeBytes should be 0
|
||||||
checkWriteIOStats(0, 0);
|
checkWriteIOStats(0, 0);
|
||||||
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
|
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
|
||||||
ByteBuffer.wrap(data), new DispatcherContext.Builder()
|
new DispatcherContext.Builder()
|
||||||
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA).build());
|
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA).build());
|
||||||
// Now a chunk file is being written with Stage WRITE_DATA, so it should
|
// Now a chunk file is being written with Stage WRITE_DATA, so it should
|
||||||
// create a temporary chunk file.
|
// create a temporary chunk file.
|
||||||
@ -137,13 +136,13 @@ public class TestChunkManagerImpl {
|
|||||||
// As chunk write stage is WRITE_DATA, temp chunk file will be created.
|
// As chunk write stage is WRITE_DATA, temp chunk file will be created.
|
||||||
assertTrue(tempChunkFile.exists());
|
assertTrue(tempChunkFile.exists());
|
||||||
|
|
||||||
checkWriteIOStats(data.length, 1);
|
checkWriteIOStats(data.capacity(), 1);
|
||||||
|
|
||||||
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
|
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
|
||||||
ByteBuffer.wrap(data), new DispatcherContext.Builder()
|
new DispatcherContext.Builder()
|
||||||
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA).build());
|
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA).build());
|
||||||
|
|
||||||
checkWriteIOStats(data.length, 1);
|
checkWriteIOStats(data.capacity(), 1);
|
||||||
|
|
||||||
// Old temp file should have been renamed to chunk file.
|
// Old temp file should have been renamed to chunk file.
|
||||||
assertTrue(chunksPath.listFiles().length == 1);
|
assertTrue(chunksPath.listFiles().length == 1);
|
||||||
@ -160,8 +159,8 @@ public class TestChunkManagerImpl {
|
|||||||
long randomLength = 200L;
|
long randomLength = 200L;
|
||||||
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
|
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
|
||||||
.getLocalID(), 0), 0, randomLength);
|
.getLocalID(), 0), 0, randomLength);
|
||||||
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
|
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
|
||||||
ByteBuffer.wrap(data), getDispatcherContext());
|
getDispatcherContext());
|
||||||
fail("testWriteChunkIncorrectLength failed");
|
fail("testWriteChunkIncorrectLength failed");
|
||||||
} catch (StorageContainerException ex) {
|
} catch (StorageContainerException ex) {
|
||||||
// As we got an exception, writeBytes should be 0.
|
// As we got an exception, writeBytes should be 0.
|
||||||
@ -181,35 +180,36 @@ public class TestChunkManagerImpl {
|
|||||||
// Initially chunks folder should be empty.
|
// Initially chunks folder should be empty.
|
||||||
assertTrue(chunksPath.listFiles().length == 0);
|
assertTrue(chunksPath.listFiles().length == 0);
|
||||||
checkWriteIOStats(0, 0);
|
checkWriteIOStats(0, 0);
|
||||||
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
|
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
|
||||||
ByteBuffer.wrap(data), getDispatcherContext());
|
getDispatcherContext());
|
||||||
// Now a chunk file is being written with Stage COMBINED_DATA, so it should
|
// Now a chunk file is being written with Stage COMBINED_DATA, so it should
|
||||||
// create a chunk file.
|
// create a chunk file.
|
||||||
assertTrue(chunksPath.listFiles().length == 1);
|
assertTrue(chunksPath.listFiles().length == 1);
|
||||||
File chunkFile = ChunkUtils.getChunkFile(keyValueContainerData, chunkInfo);
|
File chunkFile = ChunkUtils.getChunkFile(keyValueContainerData, chunkInfo);
|
||||||
assertTrue(chunkFile.exists());
|
assertTrue(chunkFile.exists());
|
||||||
checkWriteIOStats(data.length, 1);
|
checkWriteIOStats(data.capacity(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReadChunk() throws Exception {
|
public void testReadChunk() throws Exception {
|
||||||
checkWriteIOStats(0, 0);
|
checkWriteIOStats(0, 0);
|
||||||
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
|
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
|
||||||
ByteBuffer.wrap(data), getDispatcherContext());
|
getDispatcherContext());
|
||||||
checkWriteIOStats(data.length, 1);
|
checkWriteIOStats(data.capacity(), 1);
|
||||||
checkReadIOStats(0, 0);
|
checkReadIOStats(0, 0);
|
||||||
byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
|
ByteBuffer expectedData = chunkManager.readChunk(keyValueContainer, blockID,
|
||||||
chunkInfo, getDispatcherContext());
|
chunkInfo, getDispatcherContext());
|
||||||
assertEquals(expectedData.length, data.length);
|
assertEquals(expectedData.limit()-expectedData.position(),
|
||||||
assertTrue(Arrays.equals(expectedData, data));
|
chunkInfo.getLen());
|
||||||
checkReadIOStats(data.length, 1);
|
assertTrue(expectedData.rewind().equals(data.rewind()));
|
||||||
|
checkReadIOStats(expectedData.capacity(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeleteChunk() throws Exception {
|
public void testDeleteChunk() throws Exception {
|
||||||
File chunksPath = new File(keyValueContainerData.getChunksPath());
|
File chunksPath = new File(keyValueContainerData.getChunksPath());
|
||||||
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
|
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
|
||||||
ByteBuffer.wrap(data), getDispatcherContext());
|
getDispatcherContext());
|
||||||
assertTrue(chunksPath.listFiles().length == 1);
|
assertTrue(chunksPath.listFiles().length == 1);
|
||||||
chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
|
chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
|
||||||
assertTrue(chunksPath.listFiles().length == 0);
|
assertTrue(chunksPath.listFiles().length == 0);
|
||||||
@ -218,8 +218,8 @@ public class TestChunkManagerImpl {
|
|||||||
@Test
|
@Test
|
||||||
public void testDeleteChunkUnsupportedRequest() throws Exception {
|
public void testDeleteChunkUnsupportedRequest() throws Exception {
|
||||||
try {
|
try {
|
||||||
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
|
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
|
||||||
ByteBuffer.wrap(data), getDispatcherContext());
|
getDispatcherContext());
|
||||||
long randomLength = 200L;
|
long randomLength = 200L;
|
||||||
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
|
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
|
||||||
.getLocalID(), 0), 0, randomLength);
|
.getLocalID(), 0), 0, randomLength);
|
||||||
@ -235,8 +235,8 @@ public class TestChunkManagerImpl {
|
|||||||
public void testReadChunkFileNotExists() throws Exception {
|
public void testReadChunkFileNotExists() throws Exception {
|
||||||
try {
|
try {
|
||||||
// trying to read a chunk, where chunk file does not exist
|
// trying to read a chunk, where chunk file does not exist
|
||||||
byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
|
ByteBuffer expectedData = chunkManager.readChunk(keyValueContainer,
|
||||||
chunkInfo, getDispatcherContext());
|
blockID, chunkInfo, getDispatcherContext());
|
||||||
fail("testReadChunkFileNotExists failed");
|
fail("testReadChunkFileNotExists failed");
|
||||||
} catch (StorageContainerException ex) {
|
} catch (StorageContainerException ex) {
|
||||||
GenericTestUtils.assertExceptionContains("Unable to find the chunk " +
|
GenericTestUtils.assertExceptionContains("Unable to find the chunk " +
|
||||||
@ -249,20 +249,21 @@ public class TestChunkManagerImpl {
|
|||||||
public void testWriteAndReadChunkMultipleTimes() throws Exception {
|
public void testWriteAndReadChunkMultipleTimes() throws Exception {
|
||||||
for (int i=0; i<100; i++) {
|
for (int i=0; i<100; i++) {
|
||||||
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
|
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
|
||||||
.getLocalID(), i), 0, data.length);
|
.getLocalID(), i), 0, data.capacity());
|
||||||
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo,
|
chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
|
||||||
ByteBuffer.wrap(data), getDispatcherContext());
|
getDispatcherContext());
|
||||||
|
data.rewind();
|
||||||
}
|
}
|
||||||
checkWriteIOStats(data.length*100, 100);
|
checkWriteIOStats(data.capacity()*100, 100);
|
||||||
assertTrue(hddsVolume.getVolumeIOStats().getWriteTime() > 0);
|
assertTrue(hddsVolume.getVolumeIOStats().getWriteTime() > 0);
|
||||||
|
|
||||||
for (int i=0; i<100; i++) {
|
for (int i=0; i<100; i++) {
|
||||||
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
|
chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
|
||||||
.getLocalID(), i), 0, data.length);
|
.getLocalID(), i), 0, data.capacity());
|
||||||
chunkManager.readChunk(keyValueContainer, blockID, chunkInfo,
|
chunkManager.readChunk(keyValueContainer, blockID, chunkInfo,
|
||||||
getDispatcherContext());
|
getDispatcherContext());
|
||||||
}
|
}
|
||||||
checkReadIOStats(data.length*100, 100);
|
checkReadIOStats(data.capacity()*100, 100);
|
||||||
assertTrue(hddsVolume.getVolumeIOStats().getReadTime() > 0);
|
assertTrue(hddsVolume.getVolumeIOStats().getReadTime() > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ozone.client.io;
|
package org.apache.hadoop.ozone.client.io;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||||
@ -100,10 +101,17 @@ public class BlockOutputStreamEntryPool {
|
|||||||
Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0);
|
Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0);
|
||||||
Preconditions.checkState(blockSize % streamBufferMaxSize == 0);
|
Preconditions.checkState(blockSize % streamBufferMaxSize == 0);
|
||||||
this.bufferPool =
|
this.bufferPool =
|
||||||
new BufferPool(chunkSize, (int) streamBufferMaxSize / chunkSize);
|
new BufferPool(chunkSize, (int) streamBufferMaxSize / chunkSize,
|
||||||
|
xceiverClientManager.byteBufferToByteStringConversion());
|
||||||
}
|
}
|
||||||
|
|
||||||
public BlockOutputStreamEntryPool() {
|
/**
|
||||||
|
* A constructor for testing purpose only.
|
||||||
|
*
|
||||||
|
* @see KeyOutputStream#KeyOutputStream()
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
BlockOutputStreamEntryPool() {
|
||||||
streamEntries = new ArrayList<>();
|
streamEntries = new ArrayList<>();
|
||||||
omClient = null;
|
omClient = null;
|
||||||
keyArgs = null;
|
keyArgs = null;
|
||||||
|
@ -30,7 +30,6 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
|||||||
import org.apache.hadoop.hdds.protocol.StorageType;
|
import org.apache.hadoop.hdds.protocol.StorageType;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
.ChecksumType;
|
.ChecksumType;
|
||||||
import org.apache.hadoop.hdds.scm.ByteStringHelper;
|
|
||||||
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
|
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
|
||||||
import org.apache.hadoop.hdds.tracing.TracingUtil;
|
import org.apache.hadoop.hdds.tracing.TracingUtil;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
@ -219,10 +218,6 @@ public class RpcClient implements ClientProtocol {
|
|||||||
OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL,
|
OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL,
|
||||||
OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL_DEFAULT);
|
OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL_DEFAULT);
|
||||||
dtService = getOMProxyProvider().getCurrentProxyDelegationToken();
|
dtService = getOMProxyProvider().getCurrentProxyDelegationToken();
|
||||||
boolean isUnsafeByteOperationsEnabled = conf.getBoolean(
|
|
||||||
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
|
|
||||||
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT);
|
|
||||||
ByteStringHelper.init(isUnsafeByteOperationsEnabled);
|
|
||||||
topologyAwareReadEnabled = conf.getBoolean(
|
topologyAwareReadEnabled = conf.getBoolean(
|
||||||
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
|
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
|
||||||
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
|
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
|
||||||
|
@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.ServerSocket;
|
import java.net.ServerSocket;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.security.MessageDigest;
|
import java.security.MessageDigest;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -193,10 +194,10 @@ public final class ContainerTestHelper {
|
|||||||
* @param len - Number of bytes.
|
* @param len - Number of bytes.
|
||||||
* @return byte array with valid data.
|
* @return byte array with valid data.
|
||||||
*/
|
*/
|
||||||
public static byte[] getData(int len) {
|
public static ByteBuffer getData(int len) {
|
||||||
byte[] data = new byte[len];
|
byte[] data = new byte[len];
|
||||||
r.nextBytes(data);
|
r.nextBytes(data);
|
||||||
return data;
|
return ByteBuffer.wrap(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -206,7 +207,7 @@ public final class ContainerTestHelper {
|
|||||||
* @param data - data array
|
* @param data - data array
|
||||||
* @throws NoSuchAlgorithmException
|
* @throws NoSuchAlgorithmException
|
||||||
*/
|
*/
|
||||||
public static void setDataChecksum(ChunkInfo info, byte[] data)
|
public static void setDataChecksum(ChunkInfo info, ByteBuffer data)
|
||||||
throws OzoneChecksumException {
|
throws OzoneChecksumException {
|
||||||
Checksum checksum = new Checksum();
|
Checksum checksum = new Checksum();
|
||||||
info.setChecksumData(checksum.computeChecksum(data));
|
info.setChecksumData(checksum.computeChecksum(data));
|
||||||
@ -232,7 +233,7 @@ public final class ContainerTestHelper {
|
|||||||
|
|
||||||
writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf());
|
writeRequest.setBlockID(blockID.getDatanodeBlockIDProtobuf());
|
||||||
|
|
||||||
byte[] data = getData(datalen);
|
ByteBuffer data = getData(datalen);
|
||||||
ChunkInfo info = getChunk(blockID.getLocalID(), 0, 0, datalen);
|
ChunkInfo info = getChunk(blockID.getLocalID(), 0, 0, datalen);
|
||||||
setDataChecksum(info, data);
|
setDataChecksum(info, data);
|
||||||
|
|
||||||
@ -262,7 +263,7 @@ public final class ContainerTestHelper {
|
|||||||
throws Exception {
|
throws Exception {
|
||||||
ContainerProtos.PutSmallFileRequestProto.Builder smallFileRequest =
|
ContainerProtos.PutSmallFileRequestProto.Builder smallFileRequest =
|
||||||
ContainerProtos.PutSmallFileRequestProto.newBuilder();
|
ContainerProtos.PutSmallFileRequestProto.newBuilder();
|
||||||
byte[] data = getData(dataLen);
|
ByteBuffer data = getData(dataLen);
|
||||||
ChunkInfo info = getChunk(blockID.getLocalID(), 0, 0, dataLen);
|
ChunkInfo info = getChunk(blockID.getLocalID(), 0, 0, dataLen);
|
||||||
setDataChecksum(info, data);
|
setDataChecksum(info, data);
|
||||||
|
|
||||||
|
@ -70,7 +70,6 @@ import java.nio.file.Paths;
|
|||||||
import java.security.MessageDigest;
|
import java.security.MessageDigest;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -349,11 +348,11 @@ public class TestContainerPersistence {
|
|||||||
}
|
}
|
||||||
ChunkInfo info = getChunk(
|
ChunkInfo info = getChunk(
|
||||||
blockID.getLocalID(), 0, 0, datalen);
|
blockID.getLocalID(), 0, 0, datalen);
|
||||||
byte[] data = getData(datalen);
|
ByteBuffer data = getData(datalen);
|
||||||
setDataChecksum(info, data);
|
setDataChecksum(info, data);
|
||||||
commitBytesBefore = container.getContainerData()
|
commitBytesBefore = container.getContainerData()
|
||||||
.getVolume().getCommittedBytes();
|
.getVolume().getCommittedBytes();
|
||||||
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
|
chunkManager.writeChunk(container, blockID, info, data,
|
||||||
getDispatcherContext());
|
getDispatcherContext());
|
||||||
commitBytesAfter = container.getContainerData()
|
commitBytesAfter = container.getContainerData()
|
||||||
.getVolume().getCommittedBytes();
|
.getVolume().getCommittedBytes();
|
||||||
@ -397,9 +396,9 @@ public class TestContainerPersistence {
|
|||||||
Map<String, ChunkInfo> fileHashMap = new HashMap<>();
|
Map<String, ChunkInfo> fileHashMap = new HashMap<>();
|
||||||
for (int x = 0; x < chunkCount; x++) {
|
for (int x = 0; x < chunkCount; x++) {
|
||||||
ChunkInfo info = getChunk(blockID.getLocalID(), x, 0, datalen);
|
ChunkInfo info = getChunk(blockID.getLocalID(), x, 0, datalen);
|
||||||
byte[] data = getData(datalen);
|
ByteBuffer data = getData(datalen);
|
||||||
setDataChecksum(info, data);
|
setDataChecksum(info, data);
|
||||||
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
|
chunkManager.writeChunk(container, blockID, info, data,
|
||||||
getDispatcherContext());
|
getDispatcherContext());
|
||||||
String fileName = String.format("%s.data.%d", blockID.getLocalID(), x);
|
String fileName = String.format("%s.data.%d", blockID.getLocalID(), x);
|
||||||
fileHashMap.put(fileName, info);
|
fileHashMap.put(fileName, info);
|
||||||
@ -431,7 +430,7 @@ public class TestContainerPersistence {
|
|||||||
for (int x = 0; x < chunkCount; x++) {
|
for (int x = 0; x < chunkCount; x++) {
|
||||||
String fileName = String.format("%s.data.%d", blockID.getLocalID(), x);
|
String fileName = String.format("%s.data.%d", blockID.getLocalID(), x);
|
||||||
ChunkInfo info = fileHashMap.get(fileName);
|
ChunkInfo info = fileHashMap.get(fileName);
|
||||||
byte[] data = chunkManager
|
ByteBuffer data = chunkManager
|
||||||
.readChunk(container, blockID, info, getDispatcherContext());
|
.readChunk(container, blockID, info, getDispatcherContext());
|
||||||
ChecksumData checksumData = checksum.computeChecksum(data);
|
ChecksumData checksumData = checksum.computeChecksum(data);
|
||||||
Assert.assertEquals(info.getChecksumData(), checksumData);
|
Assert.assertEquals(info.getChecksumData(), checksumData);
|
||||||
@ -456,21 +455,22 @@ public class TestContainerPersistence {
|
|||||||
BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID);
|
BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID);
|
||||||
ChunkInfo info = getChunk(
|
ChunkInfo info = getChunk(
|
||||||
blockID.getLocalID(), 0, 0, datalen);
|
blockID.getLocalID(), 0, 0, datalen);
|
||||||
byte[] data = getData(datalen);
|
ByteBuffer data = getData(datalen);
|
||||||
setDataChecksum(info, data);
|
setDataChecksum(info, data);
|
||||||
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
|
chunkManager.writeChunk(container, blockID, info, data,
|
||||||
getDispatcherContext());
|
getDispatcherContext());
|
||||||
|
|
||||||
byte[] readData = chunkManager
|
ByteBuffer readData = chunkManager
|
||||||
.readChunk(container, blockID, info, getDispatcherContext());
|
.readChunk(container, blockID, info, getDispatcherContext());
|
||||||
assertTrue(Arrays.equals(data, readData));
|
assertTrue(data.rewind().equals(readData.rewind()));
|
||||||
|
|
||||||
ChunkInfo info2 = getChunk(blockID.getLocalID(), 0, start, length);
|
ChunkInfo info2 = getChunk(blockID.getLocalID(), 0, start, length);
|
||||||
byte[] readData2 = chunkManager
|
ByteBuffer readData2 = chunkManager
|
||||||
.readChunk(container, blockID, info2, getDispatcherContext());
|
.readChunk(container, blockID, info2, getDispatcherContext());
|
||||||
assertEquals(length, readData2.length);
|
assertEquals(length, info2.getLen());
|
||||||
assertTrue(Arrays.equals(
|
boolean equals =
|
||||||
Arrays.copyOfRange(data, start, start + length), readData2));
|
data.position(start).limit(start+length).equals(readData2.rewind());
|
||||||
|
assertTrue(equals);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -491,15 +491,17 @@ public class TestContainerPersistence {
|
|||||||
BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID);
|
BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID);
|
||||||
ChunkInfo info = getChunk(
|
ChunkInfo info = getChunk(
|
||||||
blockID.getLocalID(), 0, 0, datalen);
|
blockID.getLocalID(), 0, 0, datalen);
|
||||||
byte[] data = getData(datalen);
|
ByteBuffer data = getData(datalen);
|
||||||
setDataChecksum(info, data);
|
setDataChecksum(info, data);
|
||||||
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
|
chunkManager.writeChunk(container, blockID, info, data,
|
||||||
getDispatcherContext());
|
getDispatcherContext());
|
||||||
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
|
data.rewind();
|
||||||
|
chunkManager.writeChunk(container, blockID, info, data,
|
||||||
getDispatcherContext());
|
getDispatcherContext());
|
||||||
|
data.rewind();
|
||||||
// With the overwrite flag it should work now.
|
// With the overwrite flag it should work now.
|
||||||
info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
|
info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
|
||||||
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
|
chunkManager.writeChunk(container, blockID, info, data,
|
||||||
getDispatcherContext());
|
getDispatcherContext());
|
||||||
long bytesUsed = container.getContainerData().getBytesUsed();
|
long bytesUsed = container.getContainerData().getBytesUsed();
|
||||||
Assert.assertEquals(datalen, bytesUsed);
|
Assert.assertEquals(datalen, bytesUsed);
|
||||||
@ -531,17 +533,18 @@ public class TestContainerPersistence {
|
|||||||
long offset = x * datalen;
|
long offset = x * datalen;
|
||||||
ChunkInfo info = getChunk(
|
ChunkInfo info = getChunk(
|
||||||
blockID.getLocalID(), 0, offset, datalen);
|
blockID.getLocalID(), 0, offset, datalen);
|
||||||
byte[] data = getData(datalen);
|
ByteBuffer data = getData(datalen);
|
||||||
oldSha.update(data);
|
oldSha.update(data);
|
||||||
|
data.rewind();
|
||||||
setDataChecksum(info, data);
|
setDataChecksum(info, data);
|
||||||
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
|
chunkManager.writeChunk(container, blockID, info, data,
|
||||||
getDispatcherContext());
|
getDispatcherContext());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Request to read the whole data in a single go.
|
// Request to read the whole data in a single go.
|
||||||
ChunkInfo largeChunk = getChunk(blockID.getLocalID(), 0, 0,
|
ChunkInfo largeChunk = getChunk(blockID.getLocalID(), 0, 0,
|
||||||
datalen * chunkCount);
|
datalen * chunkCount);
|
||||||
byte[] newdata =
|
ByteBuffer newdata =
|
||||||
chunkManager.readChunk(container, blockID, largeChunk,
|
chunkManager.readChunk(container, blockID, largeChunk,
|
||||||
getDispatcherContext());
|
getDispatcherContext());
|
||||||
MessageDigest newSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
|
MessageDigest newSha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
|
||||||
@ -566,9 +569,9 @@ public class TestContainerPersistence {
|
|||||||
BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID);
|
BlockID blockID = ContainerTestHelper.getTestBlockID(testContainerID);
|
||||||
ChunkInfo info = getChunk(
|
ChunkInfo info = getChunk(
|
||||||
blockID.getLocalID(), 0, 0, datalen);
|
blockID.getLocalID(), 0, 0, datalen);
|
||||||
byte[] data = getData(datalen);
|
ByteBuffer data = getData(datalen);
|
||||||
setDataChecksum(info, data);
|
setDataChecksum(info, data);
|
||||||
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
|
chunkManager.writeChunk(container, blockID, info, data,
|
||||||
getDispatcherContext());
|
getDispatcherContext());
|
||||||
chunkManager.deleteChunk(container, blockID, info);
|
chunkManager.deleteChunk(container, blockID, info);
|
||||||
exception.expect(StorageContainerException.class);
|
exception.expect(StorageContainerException.class);
|
||||||
@ -681,9 +684,9 @@ public class TestContainerPersistence {
|
|||||||
for (int x = 1; x < chunkCount; x++) {
|
for (int x = 1; x < chunkCount; x++) {
|
||||||
// with holes in the front (before x * datalen)
|
// with holes in the front (before x * datalen)
|
||||||
info = getChunk(blockID.getLocalID(), x, x * datalen, datalen);
|
info = getChunk(blockID.getLocalID(), x, x * datalen, datalen);
|
||||||
byte[] data = getData(datalen);
|
ByteBuffer data = getData(datalen);
|
||||||
setDataChecksum(info, data);
|
setDataChecksum(info, data);
|
||||||
chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data),
|
chunkManager.writeChunk(container, blockID, info, data,
|
||||||
getDispatcherContext());
|
getDispatcherContext());
|
||||||
totalSize += datalen;
|
totalSize += datalen;
|
||||||
chunkList.add(info);
|
chunkList.add(info);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user