From 57b2a9b17d187a9db6d9c73f757aea24e8d69282 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Wed, 5 Apr 2017 11:07:28 -0700 Subject: [PATCH] HDFS-11006. Ozone: support setting chunk size in streaming API. Contributed by Yiqun Lin. --- .../org/apache/hadoop/scm/ScmConfigKeys.java | 6 +++++- .../hadoop/scm/storage/ChunkOutputStream.java | 15 +++++++------- .../storage/DistributedStorageHandler.java | 20 ++++++++++++++++++- .../src/main/resources/ozone-default.xml | 12 +++++++++++ 4 files changed, 44 insertions(+), 9 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java index fed4459729..06f06cc840 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java @@ -45,7 +45,11 @@ public final class ScmConfigKeys { = "GRPC"; // TODO : this is copied from OzoneConsts, may need to move to a better place - public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB + public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size"; + // 1 MB by default + public static final int OZONE_SCM_CHUNK_SIZE_DEFAULT = 1 * 1024 * 1024; + public static final int OZONE_SCM_CHUNK_MAX_SIZE = 1 * 1024 * 1024; + public static final int OZONE_SCM_CLIENT_PORT_DEFAULT = 9860; public static final int OZONE_SCM_DATANODE_PORT_DEFAULT = 9861; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java index 3e9a3d4db6..962ca6b3c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java @@ -31,7 +31,6 @@ import com.google.protobuf.ByteString; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue; -import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.XceiverClientSpi; @@ -62,6 +61,7 @@ public class ChunkOutputStream extends OutputStream { private ByteBuffer buffer; private final String streamId; private int chunkIndex; + private int chunkSize; /** * Creates a new ChunkOutputStream. @@ -71,13 +71,15 @@ public class ChunkOutputStream extends OutputStream { * @param xceiverClientManager client manager that controls client * @param xceiverClient client to perform container calls * @param traceID container protocol call args + * @param chunkSize chunk size */ public ChunkOutputStream(String containerKey, String key, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, - String traceID) { + String traceID, int chunkSize) { this.containerKey = containerKey; this.key = key; this.traceID = traceID; + this.chunkSize = chunkSize; KeyValue keyValue = KeyValue.newBuilder() .setKey("TYPE").setValue("KEY").build(); this.containerKeyData = KeyData.newBuilder() @@ -86,7 +88,7 @@ public class ChunkOutputStream extends OutputStream { .addMetadata(keyValue); this.xceiverClientManager = xceiverClientManager; this.xceiverClient = xceiverClient; - this.buffer = ByteBuffer.allocate(ScmConfigKeys.CHUNK_SIZE); + this.buffer = ByteBuffer.allocate(chunkSize); this.streamId = UUID.randomUUID().toString(); this.chunkIndex = 0; } @@ -97,7 +99,7 @@ public class ChunkOutputStream extends OutputStream { int rollbackPosition = buffer.position(); int rollbackLimit = buffer.limit(); buffer.put((byte)b); - if (buffer.position() == ScmConfigKeys.CHUNK_SIZE) { + if (buffer.position() == chunkSize) { flushBufferToChunk(rollbackPosition, rollbackLimit); } } @@ -116,12 +118,11 @@ public class ChunkOutputStream extends OutputStream { } checkOpen(); while (len > 0) { - int writeLen = Math.min( - ScmConfigKeys.CHUNK_SIZE - buffer.position(), len); + int writeLen = Math.min(chunkSize - buffer.position(), len); int rollbackPosition = buffer.position(); int rollbackLimit = buffer.limit(); buffer.put(b, off, writeLen); - if (buffer.position() == ScmConfigKeys.CHUNK_SIZE) { + if (buffer.position() == chunkSize) { flushBufferToChunk(rollbackPosition, rollbackLimit); } off += writeLen; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index ef0dc18544..f30f2ae048 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.protocol.LocatedContainer; import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; @@ -40,6 +41,8 @@ import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.storage.ChunkInputStream; import org.apache.hadoop.scm.storage.ChunkOutputStream; import org.apache.hadoop.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.OutputStream; @@ -55,11 +58,15 @@ import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey; * across the nodes of an HDFS cluster. */ public final class DistributedStorageHandler implements StorageHandler { + private static final Logger LOG = + LoggerFactory.getLogger(DistributedStorageHandler.class); private final StorageContainerLocationProtocolClientSideTranslatorPB storageContainerLocation; private final XceiverClientManager xceiverClientManager; + private int chunkSize; + /** * Creates a new DistributedStorageHandler. * @@ -71,6 +78,16 @@ public final class DistributedStorageHandler implements StorageHandler { storageContainerLocation) { this.storageContainerLocation = storageContainerLocation; this.xceiverClientManager = new XceiverClientManager(conf); + + chunkSize = conf.getInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, + ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT); + if(chunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) { + LOG.warn("The chunk size ({}) is not allowed to be more than" + + " the maximum size ({})," + + " resetting to the maximum size.", + chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE); + chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE; + } } @Override @@ -227,7 +244,8 @@ public final class DistributedStorageHandler implements StorageHandler { key.setCreatedOn(dateToString(new Date())); XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey); return new ChunkOutputStream(containerKey, key.getKeyName(), - xceiverClientManager, xceiverClient, args.getRequestID()); + xceiverClientManager, xceiverClient, args.getRequestID(), + chunkSize); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index e3e758cda8..212c68a090 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -110,4 +110,16 @@ The default is appropriate for small clusters (tens of nodes). + + + ozone.scm.chunk.size + 1048576 + + The chunk size for read/write chunk operations in bytes. + + The chunk size defaults to 1MB. If the value configured is more + than the maximum size (1MB), it will be reset to the maximum + size. + +