HDFS-11006. Ozone: support setting chunk size in streaming API. Contributed by Yiqun Lin.

This commit is contained in:
Anu Engineer 2017-04-05 11:07:28 -07:00 committed by Owen O'Malley
parent cd361fa490
commit 57b2a9b17d
4 changed files with 44 additions and 9 deletions

View File

@ -45,7 +45,11 @@ public final class ScmConfigKeys {
= "GRPC"; = "GRPC";
// TODO : this is copied from OzoneConsts, may need to move to a better place // TODO : this is copied from OzoneConsts, may need to move to a better place
public static final int CHUNK_SIZE = 1 * 1024 * 1024; // 1 MB public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size";
// 1 MB by default
public static final int OZONE_SCM_CHUNK_SIZE_DEFAULT = 1 * 1024 * 1024;
public static final int OZONE_SCM_CHUNK_MAX_SIZE = 1 * 1024 * 1024;
public static final int OZONE_SCM_CLIENT_PORT_DEFAULT = 9860; public static final int OZONE_SCM_CLIENT_PORT_DEFAULT = 9860;
public static final int OZONE_SCM_DATANODE_PORT_DEFAULT = 9861; public static final int OZONE_SCM_DATANODE_PORT_DEFAULT = 9861;

View File

@ -31,7 +31,6 @@
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyValue;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.XceiverClientSpi;
@ -62,6 +61,7 @@ public class ChunkOutputStream extends OutputStream {
private ByteBuffer buffer; private ByteBuffer buffer;
private final String streamId; private final String streamId;
private int chunkIndex; private int chunkIndex;
private int chunkSize;
/** /**
* Creates a new ChunkOutputStream. * Creates a new ChunkOutputStream.
@ -71,13 +71,15 @@ public class ChunkOutputStream extends OutputStream {
* @param xceiverClientManager client manager that controls client * @param xceiverClientManager client manager that controls client
* @param xceiverClient client to perform container calls * @param xceiverClient client to perform container calls
* @param traceID container protocol call args * @param traceID container protocol call args
* @param chunkSize chunk size
*/ */
public ChunkOutputStream(String containerKey, String key, public ChunkOutputStream(String containerKey, String key,
XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
String traceID) { String traceID, int chunkSize) {
this.containerKey = containerKey; this.containerKey = containerKey;
this.key = key; this.key = key;
this.traceID = traceID; this.traceID = traceID;
this.chunkSize = chunkSize;
KeyValue keyValue = KeyValue.newBuilder() KeyValue keyValue = KeyValue.newBuilder()
.setKey("TYPE").setValue("KEY").build(); .setKey("TYPE").setValue("KEY").build();
this.containerKeyData = KeyData.newBuilder() this.containerKeyData = KeyData.newBuilder()
@ -86,7 +88,7 @@ public ChunkOutputStream(String containerKey, String key,
.addMetadata(keyValue); .addMetadata(keyValue);
this.xceiverClientManager = xceiverClientManager; this.xceiverClientManager = xceiverClientManager;
this.xceiverClient = xceiverClient; this.xceiverClient = xceiverClient;
this.buffer = ByteBuffer.allocate(ScmConfigKeys.CHUNK_SIZE); this.buffer = ByteBuffer.allocate(chunkSize);
this.streamId = UUID.randomUUID().toString(); this.streamId = UUID.randomUUID().toString();
this.chunkIndex = 0; this.chunkIndex = 0;
} }
@ -97,7 +99,7 @@ public synchronized void write(int b) throws IOException {
int rollbackPosition = buffer.position(); int rollbackPosition = buffer.position();
int rollbackLimit = buffer.limit(); int rollbackLimit = buffer.limit();
buffer.put((byte)b); buffer.put((byte)b);
if (buffer.position() == ScmConfigKeys.CHUNK_SIZE) { if (buffer.position() == chunkSize) {
flushBufferToChunk(rollbackPosition, rollbackLimit); flushBufferToChunk(rollbackPosition, rollbackLimit);
} }
} }
@ -116,12 +118,11 @@ public void write(byte[] b, int off, int len) throws IOException {
} }
checkOpen(); checkOpen();
while (len > 0) { while (len > 0) {
int writeLen = Math.min( int writeLen = Math.min(chunkSize - buffer.position(), len);
ScmConfigKeys.CHUNK_SIZE - buffer.position(), len);
int rollbackPosition = buffer.position(); int rollbackPosition = buffer.position();
int rollbackLimit = buffer.limit(); int rollbackLimit = buffer.limit();
buffer.put(b, off, writeLen); buffer.put(b, off, writeLen);
if (buffer.position() == ScmConfigKeys.CHUNK_SIZE) { if (buffer.position() == chunkSize) {
flushBufferToChunk(rollbackPosition, rollbackLimit); flushBufferToChunk(rollbackPosition, rollbackLimit);
} }
off += writeLen; off += writeLen;

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.protocol.LocatedContainer; import org.apache.hadoop.scm.protocol.LocatedContainer;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
@ -40,6 +41,8 @@
import org.apache.hadoop.scm.storage.ChunkInputStream; import org.apache.hadoop.scm.storage.ChunkInputStream;
import org.apache.hadoop.scm.storage.ChunkOutputStream; import org.apache.hadoop.scm.storage.ChunkOutputStream;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
@ -55,11 +58,15 @@
* across the nodes of an HDFS cluster. * across the nodes of an HDFS cluster.
*/ */
public final class DistributedStorageHandler implements StorageHandler { public final class DistributedStorageHandler implements StorageHandler {
private static final Logger LOG =
LoggerFactory.getLogger(DistributedStorageHandler.class);
private final StorageContainerLocationProtocolClientSideTranslatorPB private final StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocation; storageContainerLocation;
private final XceiverClientManager xceiverClientManager; private final XceiverClientManager xceiverClientManager;
private int chunkSize;
/** /**
* Creates a new DistributedStorageHandler. * Creates a new DistributedStorageHandler.
* *
@ -71,6 +78,16 @@ public DistributedStorageHandler(OzoneConfiguration conf,
storageContainerLocation) { storageContainerLocation) {
this.storageContainerLocation = storageContainerLocation; this.storageContainerLocation = storageContainerLocation;
this.xceiverClientManager = new XceiverClientManager(conf); this.xceiverClientManager = new XceiverClientManager(conf);
chunkSize = conf.getInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_DEFAULT);
if(chunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) {
LOG.warn("The chunk size ({}) is not allowed to be more than"
+ " the maximum size ({}),"
+ " resetting to the maximum size.",
chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE);
chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
}
} }
@Override @Override
@ -227,7 +244,8 @@ public OutputStream newKeyWriter(KeyArgs args) throws IOException,
key.setCreatedOn(dateToString(new Date())); key.setCreatedOn(dateToString(new Date()));
XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey); XceiverClientSpi xceiverClient = acquireXceiverClient(containerKey);
return new ChunkOutputStream(containerKey, key.getKeyName(), return new ChunkOutputStream(containerKey, key.getKeyName(),
xceiverClientManager, xceiverClient, args.getRequestID()); xceiverClientManager, xceiverClient, args.getRequestID(),
chunkSize);
} }
@Override @Override

View File

@ -110,4 +110,16 @@
The default is appropriate for small clusters (tens of nodes). The default is appropriate for small clusters (tens of nodes).
</description> </description>
</property> </property>
<property>
<name>ozone.scm.chunk.size</name>
<value>1048576</value>
<description>
The chunk size for read/write chunk operations in bytes.
The chunk size defaults to 1MB. If the value configured is more
than the maximum size (1MB), it will be reset to the maximum
size.
</description>
</property>
</configuration> </configuration>