From 98eeabbdb3c06f4b60ea2a0525879bb797232f95 Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Thu, 11 Jan 2018 11:27:58 -0800 Subject: [PATCH] HDFS-12794. Ozone: Parallelize ChunkOutputSream Writes to container. Contributed by Shashikant Banerjee. --- .../apache/hadoop/ozone/OzoneConfigKeys.java | 3 + .../client/io/ChunkGroupOutputStream.java | 54 +++- .../hadoop/ozone/client/rpc/RpcClient.java | 10 + .../hadoop/scm/storage/ChunkOutputStream.java | 257 +++++++++++++----- .../scm/storage/ContainerProtocolCalls.java | 13 +- .../storage/DistributedStorageHandler.java | 10 + .../src/main/resources/ozone-default.xml | 9 + .../hadoop/ozone/ksm/TestKeySpaceManager.java | 2 +- 8 files changed, 273 insertions(+), 85 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 8059b5eee9..cb3f0f6193 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -107,6 +107,9 @@ public final class OzoneConfigKeys { "ozone.scm.block.size.in.mb"; public static final long OZONE_SCM_BLOCK_SIZE_DEFAULT = 256; + public static final String OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB = + "ozone.output.stream.buffer.size.in.mb"; + public static final long OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT = 256; /** * Ozone administrator users delimited by comma. * If not set, only the user who launches an ozone service will be the diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index fe248e367b..a44a00930a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -19,7 +19,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; @@ -46,6 +49,9 @@ import java.util.ArrayList; import java.util.List; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT; + /** * Maintaining a list of ChunkInputStream. Write based on offset. * @@ -72,6 +78,7 @@ public class ChunkGroupOutputStream extends OutputStream { private final XceiverClientManager xceiverClientManager; private final int chunkSize; private final String requestID; + private final long streamBufferSize; /** * A constructor for testing purpose only. @@ -86,6 +93,7 @@ public ChunkGroupOutputStream() { xceiverClientManager = null; chunkSize = 0; requestID = null; + streamBufferSize = OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT * OzoneConsts.MB; } /** @@ -105,12 +113,26 @@ public List getStreamEntries() { return streamEntries; } - public ChunkGroupOutputStream( + /** + * Chunkoutput stream, making this package visible since this can be + * created only via builder. + * @param handler - Open Key state. + * @param xceiverClientManager - Communication Manager. + * @param scmClient - SCM protocol Client. + * @param ksmClient - KSM Protocol client + * @param chunkSize - Chunk Size - I/O + * @param requestId - Seed for trace ID generation. + * @param factor - Replication factor + * @param type - Replication Type - RATIS/Standalone etc. + * @param maxBufferSize - Maximum stream buffer Size. + * @throws IOException - Throws this exception if there is an error. + */ + ChunkGroupOutputStream( OpenKeySession handler, XceiverClientManager xceiverClientManager, StorageContainerLocationProtocolClientSideTranslatorPB scmClient, KeySpaceManagerProtocolClientSideTranslatorPB ksmClient, int chunkSize, String requestId, ReplicationFactor factor, - ReplicationType type) throws IOException { + ReplicationType type, long maxBufferSize) throws IOException { this.streamEntries = new ArrayList<>(); this.currentStreamIndex = 0; this.byteOffset = 0; @@ -130,6 +152,7 @@ public ChunkGroupOutputStream( this.requestID = requestId; LOG.debug("Expecting open key with one block, but got" + info.getKeyLocationVersions().size()); + this.streamBufferSize = maxBufferSize; } /** @@ -184,7 +207,7 @@ private void checkKeyLocationInfo(KsmKeyLocationInfo subKeyInfo) } streamEntries.add(new ChunkOutputStreamEntry(containerKey, keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID, - chunkSize, subKeyInfo.getLength())); + chunkSize, subKeyInfo.getLength(), this.streamBufferSize)); } @@ -324,6 +347,7 @@ public static class Builder { private String requestID; private ReplicationType type; private ReplicationFactor factor; + private long streamBufferSize; public Builder setHandler(OpenKeySession handler) { this.openHandler = handler; @@ -367,9 +391,23 @@ public Builder setFactor(ReplicationFactor replicationFactor) { return this; } + public Builder setStreamBufferSize(long blockSize) { + this.streamBufferSize = blockSize; + return this; + } + public ChunkGroupOutputStream build() throws IOException { + Preconditions.checkNotNull(openHandler); + Preconditions.checkNotNull(xceiverManager); + Preconditions.checkNotNull(scmClient); + Preconditions.checkNotNull(ksmClient); + Preconditions.checkState(chunkSize > 0); + Preconditions.checkState(StringUtils.isNotEmpty(requestID)); + Preconditions + .checkState(streamBufferSize > 0 && streamBufferSize > chunkSize); + return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient, - ksmClient, chunkSize, requestID, factor, type); + ksmClient, chunkSize, requestID, factor, type, streamBufferSize); } } @@ -385,11 +423,12 @@ private static class ChunkOutputStreamEntry extends OutputStream { private final long length; // the current position of this stream 0 <= currentPosition < length private long currentPosition; + private long streamBufferSize; // Max block size. ChunkOutputStreamEntry(String containerKey, String key, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, String requestId, int chunkSize, - long length) { + long length, long streamBufferSize) { this.outputStream = null; this.containerKey = containerKey; this.key = key; @@ -400,6 +439,7 @@ private static class ChunkOutputStreamEntry extends OutputStream { this.length = length; this.currentPosition = 0; + this.streamBufferSize = streamBufferSize; } /** @@ -418,6 +458,8 @@ private static class ChunkOutputStreamEntry extends OutputStream { this.length = length; this.currentPosition = 0; + this.streamBufferSize = + OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT * OzoneConsts.MB; } long getLength() { @@ -432,7 +474,7 @@ private synchronized void checkStream() { if (this.outputStream == null) { this.outputStream = new ChunkOutputStream(containerKey, key, xceiverClientManager, xceiverClient, - requestId, chunkSize); + requestId, chunkSize, Ints.checkedCast(streamBufferSize)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 94038e2407..20f2b54a2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -74,6 +74,11 @@ import java.util.UUID; import java.util.stream.Collectors; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT; + /** * Ozone RPC Client Implementation, it connects to KSM, SCM and DataNode * to execute client calls. This uses RPC protocol for communication @@ -94,6 +99,7 @@ public class RpcClient implements ClientProtocol { private final UserGroupInformation ugi; private final OzoneAcl.OzoneACLRights userRights; private final OzoneAcl.OzoneACLRights groupRights; + private final long streamBufferSize; /** * Creates RpcClient instance with the given configuration. @@ -148,6 +154,9 @@ public RpcClient(Configuration conf) throws IOException { } else { chunkSize = configuredChunkSize; } + // streamBufferSize by default is set equal to default scm block size. + streamBufferSize = conf.getLong(OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB, + OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT) * OzoneConsts.MB; } @Override @@ -463,6 +472,7 @@ public OzoneOutputStream createKey( .setRequestID(requestId) .setType(OzoneProtos.ReplicationType.valueOf(type.toString())) .setFactor(OzoneProtos.ReplicationFactor.valueOf(factor.getValue())) + .setStreamBufferSize(streamBufferSize) .build(); groupOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java index 64c10dac88..916a506c47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java @@ -18,22 +18,32 @@ package org.apache.hadoop.scm.storage; -import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey; -import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.UUID; - +import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; - import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue; import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.XceiverClientSpi; +import org.apache.hadoop.util.Time; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .Result.SUCCESS; +import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey; +import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk; + /** * An {@link OutputStream} used by the REST service in combination with the @@ -57,12 +67,12 @@ public class ChunkOutputStream extends OutputStream { private final String key; private final String traceID; private final KeyData.Builder containerKeyData; + private final String streamId; private XceiverClientManager xceiverClientManager; private XceiverClientSpi xceiverClient; private ByteBuffer buffer; - private final String streamId; - private int chunkIndex; private int chunkSize; + private int streamBufferSize; /** * Creates a new ChunkOutputStream. @@ -73,14 +83,18 @@ public class ChunkOutputStream extends OutputStream { * @param xceiverClient client to perform container calls * @param traceID container protocol call args * @param chunkSize chunk size + * @param maxBufferSize -- Controls the maximum amount of memory that we need + * to allocate data buffering. */ public ChunkOutputStream(String containerKey, String key, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, - String traceID, int chunkSize) { + String traceID, int chunkSize, int maxBufferSize) { this.containerKey = containerKey; this.key = key; this.traceID = traceID; this.chunkSize = chunkSize; + this.streamBufferSize = maxBufferSize; + KeyValue keyValue = KeyValue.newBuilder() .setKey("TYPE").setValue("KEY").build(); this.containerKeyData = KeyData.newBuilder() @@ -89,22 +103,24 @@ public ChunkOutputStream(String containerKey, String key, .addMetadata(keyValue); this.xceiverClientManager = xceiverClientManager; this.xceiverClient = xceiverClient; - this.buffer = ByteBuffer.allocate(chunkSize); + this.buffer = ByteBuffer.allocate(maxBufferSize); this.streamId = UUID.randomUUID().toString(); - this.chunkIndex = 0; } + /** + * {@inheritDoc} + */ @Override public synchronized void write(int b) throws IOException { checkOpen(); - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); - buffer.put((byte)b); - if (buffer.position() == chunkSize) { - flushBufferToChunk(rollbackPosition, rollbackLimit); - } + byte[] c = new byte[1]; + c[0] = (byte) b; + write(c, 0, 1); } + /** + * {@inheritDoc} + */ @Override public void write(byte[] b, int off, int len) throws IOException { if (b == null) { @@ -118,26 +134,111 @@ public void write(byte[] b, int off, int len) throws IOException { return; } checkOpen(); - while (len > 0) { - int writeLen = Math.min(chunkSize - buffer.position(), len); - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); - buffer.put(b, off, writeLen); - if (buffer.position() == chunkSize) { - flushBufferToChunk(rollbackPosition, rollbackLimit); + int rollbackPosition = buffer.position(); + int rollbackLimit = buffer.limit(); + try { + List, ChunkInfo>> + writeFutures = writeInParallel(b, off, len); + // This is a rendezvous point for this function call, all chunk I/O + // for this block must complete before we can declare this call as + // complete. + + // Wait until all the futures complete or throws an exception if any of + // the calls ended with an exception this call will throw. + // if futures is null, it means that we wrote the data to the buffer and + // returned. + if (writeFutures != null) { + CompletableFuture.allOf(writeFutures.toArray(new + CompletableFuture[writeFutures.size()])).join(); + + // Wrote this data, we will clear this buffer now. + buffer.clear(); } - off += writeLen; - len -= writeLen; + } catch (InterruptedException | ExecutionException e) { + buffer.position(rollbackPosition); + buffer.limit(rollbackLimit); + throw new IOException("Unexpected error in write. ", e); } } + /** + * Write a given block into many small chunks in parallel. + * + * @param b + * @param off + * @param len + * @throws IOException + * @throws ExecutionException + * @throws InterruptedException + */ + public List, ChunkInfo>> + writeInParallel(byte[] b, int off, int len) + throws IOException, ExecutionException, InterruptedException { + + Preconditions.checkArgument(len <= streamBufferSize, + "A chunk write cannot be " + "larger than max buffer size limit."); + long newBlockCount = len / chunkSize; + buffer.put(b, off, len); + List, ChunkInfo>> + writeFutures = new LinkedList<>(); + + // We if must have at least a chunkSize of data ready to write, if so we + // will go ahead and start writing that data. + if (buffer.position() >= chunkSize) { + // Allocate new byte slices which will point to each chunk of data + // that we want to write. Divide the byte buffer into individual chunks + // each of length equals to chunkSize max where each chunk will be + // assigned a chunkId where, for each chunk the async write requests will + // be made and wait for all of them to return before the write call + // returns. + for (int chunkId = 0; chunkId < newBlockCount; chunkId++) { + // Please note : We are not flipping the slice when we write since + // the slices are pointing the buffer start and end as needed for + // the chunk write. Also please note, Duplicate does not create a + // copy of data, it only creates metadata that points to the data + // stream. + ByteBuffer chunk = buffer.duplicate(); + Preconditions.checkState((chunkId * chunkSize) < buffer.limit(), + "Chunk offset cannot be beyond the limits of the buffer."); + chunk.position(chunkId * chunkSize); + // Min handles the case where the last block might be lesser than + // chunk Size. + chunk.limit(chunk.position() + + Math.min(chunkSize, chunk.remaining() - (chunkId * chunkSize))); + + // Schedule all the writes, this is a non-block call which returns + // futures. We collect these futures and wait for all of them to + // complete in the next line. + writeFutures.add(writeChunkToContainer(chunk, 0, chunkSize)); + } + return writeFutures; + } + // Nothing to do , return null. + return null; + } + @Override public synchronized void flush() throws IOException { checkOpen(); if (buffer.position() > 0) { int rollbackPosition = buffer.position(); int rollbackLimit = buffer.limit(); - flushBufferToChunk(rollbackPosition, rollbackLimit); + ByteBuffer chunk = buffer.duplicate(); + try { + + ImmutablePair, ChunkInfo> + result = writeChunkToContainer(chunk, 0, chunkSize); + updateChunkInfo(result); + buffer.clear(); + } catch (ExecutionException | InterruptedException e) { + buffer.position(rollbackPosition); + buffer.limit(rollbackLimit); + throw new IOException("Failure in flush", e); + } } } @@ -147,10 +248,20 @@ public synchronized void close() throws IOException { buffer != null) { try { if (buffer.position() > 0) { - writeChunkToContainer(); + // This flip is needed since this is the real buffer to which we + // are writing and position will have moved each time we did a put. + buffer.flip(); + + // Call get immediately to make this call Synchronous. + + ImmutablePair, ChunkInfo> + result = writeChunkToContainer(buffer, 0, buffer.limit()); + updateChunkInfo(result); + buffer.clear(); } putKey(xceiverClient, containerKeyData.build(), traceID); - } catch (IOException e) { + } catch (IOException | InterruptedException | ExecutionException e) { throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); } finally { @@ -163,6 +274,24 @@ public synchronized void close() throws IOException { } + private void updateChunkInfo( + ImmutablePair< + CompletableFuture, + ChunkInfo + > result) throws InterruptedException, ExecutionException { + // Wait for this call to complete. + ContainerProtos.ContainerCommandResponseProto response = + result.getLeft().get(); + + // If the write call to the chunk is successful, we need to add that + // chunk information to the containerKeyData. + // TODO: Clean up the garbage in case of failure. + if(response.getResult() == SUCCESS) { + ChunkInfo chunk = result.getRight(); + containerKeyData.addChunks(chunk); + } + } + /** * Checks if the stream is open. If not, throws an exception. * @@ -174,54 +303,36 @@ private synchronized void checkOpen() throws IOException { } } - /** - * Attempts to flush buffered writes by writing a new chunk to the container. - * If successful, then clears the buffer to prepare to receive writes for a - * new chunk. - * - * @param rollbackPosition position to restore in buffer if write fails - * @param rollbackLimit limit to restore in buffer if write fails - * @throws IOException if there is an I/O error while performing the call - */ - private synchronized void flushBufferToChunk(int rollbackPosition, - int rollbackLimit) throws IOException { - boolean success = false; - try { - writeChunkToContainer(); - success = true; - } finally { - if (success) { - buffer.clear(); - } else { - buffer.position(rollbackPosition); - buffer.limit(rollbackLimit); - } - } - } - /** * Writes buffered data as a new chunk to the container and saves chunk * information to be used later in putKey call. * - * @throws IOException if there is an I/O error while performing the call + * @param data -- Data to write. + * @param offset - offset to the data buffer + * @param len - Length in bytes + * @return Returns a Immutable pair -- A future object that will contian + * the result of the operation, and the chunkInfo that we wrote. + * + * @throws IOException + * @throws ExecutionException + * @throws InterruptedException */ - private synchronized void writeChunkToContainer() throws IOException { - buffer.flip(); - ByteString data = ByteString.copyFrom(buffer); - ChunkInfo chunk = ChunkInfo - .newBuilder() - .setChunkName( + private ImmutablePair< + CompletableFuture, + ChunkInfo> + writeChunkToContainer(ByteBuffer data, int offset, int len) + throws IOException, ExecutionException, InterruptedException { + + + ByteString dataString = ByteString.copyFrom(data); + ChunkInfo chunk = ChunkInfo.newBuilder().setChunkName( DigestUtils.md5Hex(key) + "_stream_" - + streamId + "_chunk_" + ++chunkIndex) + + streamId + "_chunk_" + Time.monotonicNowNanos()) .setOffset(0) - .setLen(data.size()) + .setLen(len) .build(); - try { - writeChunk(xceiverClient, chunk, key, data, traceID); - } catch (IOException e) { - throw new IOException( - "Unexpected Storage Container Exception: " + e.toString(), e); - } - containerKeyData.addChunks(chunk); + CompletableFuture response = + writeChunk(xceiverClient, chunk, key, dataString, traceID); + return new ImmutablePair(response, chunk); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java index 1cde67cabb..7d4c72d4e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java @@ -53,6 +53,9 @@ import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + import org.apache.hadoop.scm.XceiverClientSpi; /** @@ -162,9 +165,10 @@ public static ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient, * @param traceID container protocol call args * @throws IOException if there is an I/O error while performing the call */ - public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, - String key, ByteString data, String traceID) - throws IOException { + public static CompletableFuture writeChunk( + XceiverClientSpi xceiverClient, ChunkInfo chunk, String key, + ByteString data, String traceID) + throws IOException, ExecutionException, InterruptedException { WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto .newBuilder() .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) @@ -179,8 +183,7 @@ public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, .setDatanodeID(id) .setWriteChunk(writeChunkRequest) .build(); - ContainerCommandResponseProto response = xceiverClient.sendCommand(request); - validateContainerResponse(response); + return xceiverClient.sendCommandAsync(request); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index 1830c71597..137f8f95d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -67,6 +67,11 @@ import java.io.OutputStream; import java.util.List; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB; +import static org.apache.hadoop.ozone.OzoneConfigKeys + .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT; + /** * A {@link StorageHandler} implementation that distributes object storage * across the nodes of an HDFS cluster. @@ -86,6 +91,7 @@ public final class DistributedStorageHandler implements StorageHandler { private final boolean useRatis; private final OzoneProtos.ReplicationType type; private final OzoneProtos.ReplicationFactor factor; + private final long streamBufferSize; /** * Creates a new DistributedStorageHandler. @@ -127,6 +133,9 @@ public DistributedStorageHandler(OzoneConfiguration conf, chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE); chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE; } + // streamBufferSize by default is set to default scm block size. + streamBufferSize = conf.getLong(OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB, + OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT) * OzoneConsts.MB; } @Override @@ -418,6 +427,7 @@ public OutputStream newKeyWriter(KeyArgs args) throws IOException, .setRequestID(args.getRequestID()) .setType(xceiverClientManager.getType()) .setFactor(xceiverClientManager.getFactor()) + .setStreamBufferSize(streamBufferSize) .build(); groupOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index 31c3901a95..4df99f92d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -690,6 +690,15 @@ Ozone block size. + + ozone.output.stream.buffer.size.in.mb + 256 + OZONE + + The maximum size of the buffer allocated for the ozone output stream for + write. Default size is equals to scm block size. + + ozone.scm.chunk.size 16777216 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java index c8427f3950..fc4bedc6a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java @@ -1114,7 +1114,7 @@ public void testExpiredOpenKey() throws Exception { .getMetadataManager().getExpiredOpenKeys(); Assert.assertEquals(0, openKeys.size()); - Thread.sleep(2000); + //Thread.sleep(2000); openKeys = cluster.getKeySpaceManager().getMetadataManager() .getExpiredOpenKeys();