From 98d62e55c439c93a4a80513bd7bda5a3eec97fe2 Mon Sep 17 00:00:00 2001 From: Mukul Kumar Singh Date: Wed, 17 Jan 2018 01:09:48 +0530 Subject: [PATCH] Revert "HDFS-12794. Ozone: Parallelize ChunkOutputSream Writes to container. Contributed by Shashikant Banerjee." This reverts commit 6ce5ec676164b84a9e2f8dc65b5f2199a141506d. --- .../apache/hadoop/ozone/OzoneConfigKeys.java | 3 - .../client/io/ChunkGroupOutputStream.java | 54 +--- .../hadoop/ozone/client/rpc/RpcClient.java | 10 - .../hadoop/scm/storage/ChunkOutputStream.java | 257 +++++------------- .../scm/storage/ContainerProtocolCalls.java | 13 +- .../storage/DistributedStorageHandler.java | 10 - .../src/main/resources/ozone-default.xml | 9 - .../hadoop/ozone/ksm/TestKeySpaceManager.java | 2 +- 8 files changed, 85 insertions(+), 273 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index cb3f0f6193..8059b5eee9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -107,9 +107,6 @@ public final class OzoneConfigKeys { "ozone.scm.block.size.in.mb"; public static final long OZONE_SCM_BLOCK_SIZE_DEFAULT = 256; - public static final String OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB = - "ozone.output.stream.buffer.size.in.mb"; - public static final long OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT = 256; /** * Ozone administrator users delimited by comma. * If not set, only the user who launches an ozone service will be the diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index a44a00930a..fe248e367b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -19,10 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.primitives.Ints; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result; -import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfoGroup; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor; @@ -49,9 +46,6 @@ import java.util.ArrayList; import java.util.List; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT; - /** * Maintaining a list of ChunkInputStream. Write based on offset. * @@ -78,7 +72,6 @@ public class ChunkGroupOutputStream extends OutputStream { private final XceiverClientManager xceiverClientManager; private final int chunkSize; private final String requestID; - private final long streamBufferSize; /** * A constructor for testing purpose only. @@ -93,7 +86,6 @@ public ChunkGroupOutputStream() { xceiverClientManager = null; chunkSize = 0; requestID = null; - streamBufferSize = OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT * OzoneConsts.MB; } /** @@ -113,26 +105,12 @@ public List getStreamEntries() { return streamEntries; } - /** - * Chunkoutput stream, making this package visible since this can be - * created only via builder. - * @param handler - Open Key state. - * @param xceiverClientManager - Communication Manager. - * @param scmClient - SCM protocol Client. - * @param ksmClient - KSM Protocol client - * @param chunkSize - Chunk Size - I/O - * @param requestId - Seed for trace ID generation. - * @param factor - Replication factor - * @param type - Replication Type - RATIS/Standalone etc. - * @param maxBufferSize - Maximum stream buffer Size. - * @throws IOException - Throws this exception if there is an error. - */ - ChunkGroupOutputStream( + public ChunkGroupOutputStream( OpenKeySession handler, XceiverClientManager xceiverClientManager, StorageContainerLocationProtocolClientSideTranslatorPB scmClient, KeySpaceManagerProtocolClientSideTranslatorPB ksmClient, int chunkSize, String requestId, ReplicationFactor factor, - ReplicationType type, long maxBufferSize) throws IOException { + ReplicationType type) throws IOException { this.streamEntries = new ArrayList<>(); this.currentStreamIndex = 0; this.byteOffset = 0; @@ -152,7 +130,6 @@ public List getStreamEntries() { this.requestID = requestId; LOG.debug("Expecting open key with one block, but got" + info.getKeyLocationVersions().size()); - this.streamBufferSize = maxBufferSize; } /** @@ -207,7 +184,7 @@ private void checkKeyLocationInfo(KsmKeyLocationInfo subKeyInfo) } streamEntries.add(new ChunkOutputStreamEntry(containerKey, keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID, - chunkSize, subKeyInfo.getLength(), this.streamBufferSize)); + chunkSize, subKeyInfo.getLength())); } @@ -347,7 +324,6 @@ public static class Builder { private String requestID; private ReplicationType type; private ReplicationFactor factor; - private long streamBufferSize; public Builder setHandler(OpenKeySession handler) { this.openHandler = handler; @@ -391,23 +367,9 @@ public Builder setFactor(ReplicationFactor replicationFactor) { return this; } - public Builder setStreamBufferSize(long blockSize) { - this.streamBufferSize = blockSize; - return this; - } - public ChunkGroupOutputStream build() throws IOException { - Preconditions.checkNotNull(openHandler); - Preconditions.checkNotNull(xceiverManager); - Preconditions.checkNotNull(scmClient); - Preconditions.checkNotNull(ksmClient); - Preconditions.checkState(chunkSize > 0); - Preconditions.checkState(StringUtils.isNotEmpty(requestID)); - Preconditions - .checkState(streamBufferSize > 0 && streamBufferSize > chunkSize); - return new ChunkGroupOutputStream(openHandler, xceiverManager, scmClient, - ksmClient, chunkSize, requestID, factor, type, streamBufferSize); + ksmClient, chunkSize, requestID, factor, type); } } @@ -423,12 +385,11 @@ private static class ChunkOutputStreamEntry extends OutputStream { private final long length; // the current position of this stream 0 <= currentPosition < length private long currentPosition; - private long streamBufferSize; // Max block size. ChunkOutputStreamEntry(String containerKey, String key, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, String requestId, int chunkSize, - long length, long streamBufferSize) { + long length) { this.outputStream = null; this.containerKey = containerKey; this.key = key; @@ -439,7 +400,6 @@ private static class ChunkOutputStreamEntry extends OutputStream { this.length = length; this.currentPosition = 0; - this.streamBufferSize = streamBufferSize; } /** @@ -458,8 +418,6 @@ private static class ChunkOutputStreamEntry extends OutputStream { this.length = length; this.currentPosition = 0; - this.streamBufferSize = - OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT * OzoneConsts.MB; } long getLength() { @@ -474,7 +432,7 @@ private synchronized void checkStream() { if (this.outputStream == null) { this.outputStream = new ChunkOutputStream(containerKey, key, xceiverClientManager, xceiverClient, - requestId, chunkSize, Ints.checkedCast(streamBufferSize)); + requestId, chunkSize); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 20f2b54a2f..94038e2407 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -74,11 +74,6 @@ import java.util.UUID; import java.util.stream.Collectors; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT; - /** * Ozone RPC Client Implementation, it connects to KSM, SCM and DataNode * to execute client calls. This uses RPC protocol for communication @@ -99,7 +94,6 @@ public class RpcClient implements ClientProtocol { private final UserGroupInformation ugi; private final OzoneAcl.OzoneACLRights userRights; private final OzoneAcl.OzoneACLRights groupRights; - private final long streamBufferSize; /** * Creates RpcClient instance with the given configuration. @@ -154,9 +148,6 @@ public RpcClient(Configuration conf) throws IOException { } else { chunkSize = configuredChunkSize; } - // streamBufferSize by default is set equal to default scm block size. - streamBufferSize = conf.getLong(OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB, - OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT) * OzoneConsts.MB; } @Override @@ -472,7 +463,6 @@ public OzoneOutputStream createKey( .setRequestID(requestId) .setType(OzoneProtos.ReplicationType.valueOf(type.toString())) .setFactor(OzoneProtos.ReplicationFactor.valueOf(factor.getValue())) - .setStreamBufferSize(streamBufferSize) .build(); groupOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java index 916a506c47..64c10dac88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ChunkOutputStream.java @@ -18,32 +18,22 @@ package org.apache.hadoop.scm.storage; -import com.google.common.base.Preconditions; +import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey; +import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.UUID; + import com.google.protobuf.ByteString; + import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ChunkInfo; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.KeyData; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.KeyValue; import org.apache.hadoop.scm.XceiverClientManager; import org.apache.hadoop.scm.XceiverClientSpi; -import org.apache.hadoop.util.Time; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; - -import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos - .Result.SUCCESS; -import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.putKey; -import static org.apache.hadoop.scm.storage.ContainerProtocolCalls.writeChunk; - /** * An {@link OutputStream} used by the REST service in combination with the @@ -67,12 +57,12 @@ public class ChunkOutputStream extends OutputStream { private final String key; private final String traceID; private final KeyData.Builder containerKeyData; - private final String streamId; private XceiverClientManager xceiverClientManager; private XceiverClientSpi xceiverClient; private ByteBuffer buffer; + private final String streamId; + private int chunkIndex; private int chunkSize; - private int streamBufferSize; /** * Creates a new ChunkOutputStream. @@ -83,18 +73,14 @@ public class ChunkOutputStream extends OutputStream { * @param xceiverClient client to perform container calls * @param traceID container protocol call args * @param chunkSize chunk size - * @param maxBufferSize -- Controls the maximum amount of memory that we need - * to allocate data buffering. */ public ChunkOutputStream(String containerKey, String key, XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient, - String traceID, int chunkSize, int maxBufferSize) { + String traceID, int chunkSize) { this.containerKey = containerKey; this.key = key; this.traceID = traceID; this.chunkSize = chunkSize; - this.streamBufferSize = maxBufferSize; - KeyValue keyValue = KeyValue.newBuilder() .setKey("TYPE").setValue("KEY").build(); this.containerKeyData = KeyData.newBuilder() @@ -103,24 +89,22 @@ public ChunkOutputStream(String containerKey, String key, .addMetadata(keyValue); this.xceiverClientManager = xceiverClientManager; this.xceiverClient = xceiverClient; - this.buffer = ByteBuffer.allocate(maxBufferSize); + this.buffer = ByteBuffer.allocate(chunkSize); this.streamId = UUID.randomUUID().toString(); + this.chunkIndex = 0; } - /** - * {@inheritDoc} - */ @Override public synchronized void write(int b) throws IOException { checkOpen(); - byte[] c = new byte[1]; - c[0] = (byte) b; - write(c, 0, 1); + int rollbackPosition = buffer.position(); + int rollbackLimit = buffer.limit(); + buffer.put((byte)b); + if (buffer.position() == chunkSize) { + flushBufferToChunk(rollbackPosition, rollbackLimit); + } } - /** - * {@inheritDoc} - */ @Override public void write(byte[] b, int off, int len) throws IOException { if (b == null) { @@ -134,111 +118,26 @@ public void write(byte[] b, int off, int len) throws IOException { return; } checkOpen(); - int rollbackPosition = buffer.position(); - int rollbackLimit = buffer.limit(); - try { - List, ChunkInfo>> - writeFutures = writeInParallel(b, off, len); - // This is a rendezvous point for this function call, all chunk I/O - // for this block must complete before we can declare this call as - // complete. - - // Wait until all the futures complete or throws an exception if any of - // the calls ended with an exception this call will throw. - // if futures is null, it means that we wrote the data to the buffer and - // returned. - if (writeFutures != null) { - CompletableFuture.allOf(writeFutures.toArray(new - CompletableFuture[writeFutures.size()])).join(); - - // Wrote this data, we will clear this buffer now. - buffer.clear(); + while (len > 0) { + int writeLen = Math.min(chunkSize - buffer.position(), len); + int rollbackPosition = buffer.position(); + int rollbackLimit = buffer.limit(); + buffer.put(b, off, writeLen); + if (buffer.position() == chunkSize) { + flushBufferToChunk(rollbackPosition, rollbackLimit); } - } catch (InterruptedException | ExecutionException e) { - buffer.position(rollbackPosition); - buffer.limit(rollbackLimit); - throw new IOException("Unexpected error in write. ", e); + off += writeLen; + len -= writeLen; } } - /** - * Write a given block into many small chunks in parallel. - * - * @param b - * @param off - * @param len - * @throws IOException - * @throws ExecutionException - * @throws InterruptedException - */ - public List, ChunkInfo>> - writeInParallel(byte[] b, int off, int len) - throws IOException, ExecutionException, InterruptedException { - - Preconditions.checkArgument(len <= streamBufferSize, - "A chunk write cannot be " + "larger than max buffer size limit."); - long newBlockCount = len / chunkSize; - buffer.put(b, off, len); - List, ChunkInfo>> - writeFutures = new LinkedList<>(); - - // We if must have at least a chunkSize of data ready to write, if so we - // will go ahead and start writing that data. - if (buffer.position() >= chunkSize) { - // Allocate new byte slices which will point to each chunk of data - // that we want to write. Divide the byte buffer into individual chunks - // each of length equals to chunkSize max where each chunk will be - // assigned a chunkId where, for each chunk the async write requests will - // be made and wait for all of them to return before the write call - // returns. - for (int chunkId = 0; chunkId < newBlockCount; chunkId++) { - // Please note : We are not flipping the slice when we write since - // the slices are pointing the buffer start and end as needed for - // the chunk write. Also please note, Duplicate does not create a - // copy of data, it only creates metadata that points to the data - // stream. - ByteBuffer chunk = buffer.duplicate(); - Preconditions.checkState((chunkId * chunkSize) < buffer.limit(), - "Chunk offset cannot be beyond the limits of the buffer."); - chunk.position(chunkId * chunkSize); - // Min handles the case where the last block might be lesser than - // chunk Size. - chunk.limit(chunk.position() + - Math.min(chunkSize, chunk.remaining() - (chunkId * chunkSize))); - - // Schedule all the writes, this is a non-block call which returns - // futures. We collect these futures and wait for all of them to - // complete in the next line. - writeFutures.add(writeChunkToContainer(chunk, 0, chunkSize)); - } - return writeFutures; - } - // Nothing to do , return null. - return null; - } - @Override public synchronized void flush() throws IOException { checkOpen(); if (buffer.position() > 0) { int rollbackPosition = buffer.position(); int rollbackLimit = buffer.limit(); - ByteBuffer chunk = buffer.duplicate(); - try { - - ImmutablePair, ChunkInfo> - result = writeChunkToContainer(chunk, 0, chunkSize); - updateChunkInfo(result); - buffer.clear(); - } catch (ExecutionException | InterruptedException e) { - buffer.position(rollbackPosition); - buffer.limit(rollbackLimit); - throw new IOException("Failure in flush", e); - } + flushBufferToChunk(rollbackPosition, rollbackLimit); } } @@ -248,20 +147,10 @@ public synchronized void close() throws IOException { buffer != null) { try { if (buffer.position() > 0) { - // This flip is needed since this is the real buffer to which we - // are writing and position will have moved each time we did a put. - buffer.flip(); - - // Call get immediately to make this call Synchronous. - - ImmutablePair, ChunkInfo> - result = writeChunkToContainer(buffer, 0, buffer.limit()); - updateChunkInfo(result); - buffer.clear(); + writeChunkToContainer(); } putKey(xceiverClient, containerKeyData.build(), traceID); - } catch (IOException | InterruptedException | ExecutionException e) { + } catch (IOException e) { throw new IOException( "Unexpected Storage Container Exception: " + e.toString(), e); } finally { @@ -274,24 +163,6 @@ public synchronized void close() throws IOException { } - private void updateChunkInfo( - ImmutablePair< - CompletableFuture, - ChunkInfo - > result) throws InterruptedException, ExecutionException { - // Wait for this call to complete. - ContainerProtos.ContainerCommandResponseProto response = - result.getLeft().get(); - - // If the write call to the chunk is successful, we need to add that - // chunk information to the containerKeyData. - // TODO: Clean up the garbage in case of failure. - if(response.getResult() == SUCCESS) { - ChunkInfo chunk = result.getRight(); - containerKeyData.addChunks(chunk); - } - } - /** * Checks if the stream is open. If not, throws an exception. * @@ -303,36 +174,54 @@ private synchronized void checkOpen() throws IOException { } } + /** + * Attempts to flush buffered writes by writing a new chunk to the container. + * If successful, then clears the buffer to prepare to receive writes for a + * new chunk. + * + * @param rollbackPosition position to restore in buffer if write fails + * @param rollbackLimit limit to restore in buffer if write fails + * @throws IOException if there is an I/O error while performing the call + */ + private synchronized void flushBufferToChunk(int rollbackPosition, + int rollbackLimit) throws IOException { + boolean success = false; + try { + writeChunkToContainer(); + success = true; + } finally { + if (success) { + buffer.clear(); + } else { + buffer.position(rollbackPosition); + buffer.limit(rollbackLimit); + } + } + } + /** * Writes buffered data as a new chunk to the container and saves chunk * information to be used later in putKey call. * - * @param data -- Data to write. - * @param offset - offset to the data buffer - * @param len - Length in bytes - * @return Returns a Immutable pair -- A future object that will contian - * the result of the operation, and the chunkInfo that we wrote. - * - * @throws IOException - * @throws ExecutionException - * @throws InterruptedException + * @throws IOException if there is an I/O error while performing the call */ - private ImmutablePair< - CompletableFuture, - ChunkInfo> - writeChunkToContainer(ByteBuffer data, int offset, int len) - throws IOException, ExecutionException, InterruptedException { - - - ByteString dataString = ByteString.copyFrom(data); - ChunkInfo chunk = ChunkInfo.newBuilder().setChunkName( + private synchronized void writeChunkToContainer() throws IOException { + buffer.flip(); + ByteString data = ByteString.copyFrom(buffer); + ChunkInfo chunk = ChunkInfo + .newBuilder() + .setChunkName( DigestUtils.md5Hex(key) + "_stream_" - + streamId + "_chunk_" + Time.monotonicNowNanos()) + + streamId + "_chunk_" + ++chunkIndex) .setOffset(0) - .setLen(len) + .setLen(data.size()) .build(); - CompletableFuture response = - writeChunk(xceiverClient, chunk, key, dataString, traceID); - return new ImmutablePair(response, chunk); + try { + writeChunk(xceiverClient, chunk, key, data, traceID); + } catch (IOException e) { + throw new IOException( + "Unexpected Storage Container Exception: " + e.toString(), e); + } + containerKeyData.addChunks(chunk); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java index 7d4c72d4e1..1cde67cabb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java @@ -53,9 +53,6 @@ import org.apache.hadoop.scm.container.common.helpers.StorageContainerException; import java.io.IOException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; - import org.apache.hadoop.scm.XceiverClientSpi; /** @@ -165,10 +162,9 @@ public static ReadChunkResponseProto readChunk(XceiverClientSpi xceiverClient, * @param traceID container protocol call args * @throws IOException if there is an I/O error while performing the call */ - public static CompletableFuture writeChunk( - XceiverClientSpi xceiverClient, ChunkInfo chunk, String key, - ByteString data, String traceID) - throws IOException, ExecutionException, InterruptedException { + public static void writeChunk(XceiverClientSpi xceiverClient, ChunkInfo chunk, + String key, ByteString data, String traceID) + throws IOException { WriteChunkRequestProto.Builder writeChunkRequest = WriteChunkRequestProto .newBuilder() .setPipeline(xceiverClient.getPipeline().getProtobufMessage()) @@ -183,7 +179,8 @@ public static CompletableFuture writeChunk( .setDatanodeID(id) .setWriteChunk(writeChunkRequest) .build(); - return xceiverClient.sendCommandAsync(request); + ContainerCommandResponseProto response = xceiverClient.sendCommand(request); + validateContainerResponse(response); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java index 137f8f95d4..1830c71597 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java @@ -67,11 +67,6 @@ import java.io.OutputStream; import java.util.List; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB; -import static org.apache.hadoop.ozone.OzoneConfigKeys - .OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT; - /** * A {@link StorageHandler} implementation that distributes object storage * across the nodes of an HDFS cluster. @@ -91,7 +86,6 @@ public final class DistributedStorageHandler implements StorageHandler { private final boolean useRatis; private final OzoneProtos.ReplicationType type; private final OzoneProtos.ReplicationFactor factor; - private final long streamBufferSize; /** * Creates a new DistributedStorageHandler. @@ -133,9 +127,6 @@ public DistributedStorageHandler(OzoneConfiguration conf, chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE); chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE; } - // streamBufferSize by default is set to default scm block size. - streamBufferSize = conf.getLong(OZONE_OUTPUT_STREAM_BUFFER_SIZE_IN_MB, - OZONE_OUTPUT_STREAM_BUFFER_SIZE_DEFAULT) * OzoneConsts.MB; } @Override @@ -427,7 +418,6 @@ public OutputStream newKeyWriter(KeyArgs args) throws IOException, .setRequestID(args.getRequestID()) .setType(xceiverClientManager.getType()) .setFactor(xceiverClientManager.getFactor()) - .setStreamBufferSize(streamBufferSize) .build(); groupOutputStream.addPreallocateBlocks( openKey.getKeyInfo().getLatestVersionLocations(), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index 4df99f92d6..31c3901a95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -690,15 +690,6 @@ Ozone block size. - - ozone.output.stream.buffer.size.in.mb - 256 - OZONE - - The maximum size of the buffer allocated for the ozone output stream for - write. Default size is equals to scm block size. - - ozone.scm.chunk.size 16777216 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java index fc4bedc6a3..c8427f3950 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestKeySpaceManager.java @@ -1114,7 +1114,7 @@ public void testExpiredOpenKey() throws Exception { .getMetadataManager().getExpiredOpenKeys(); Assert.assertEquals(0, openKeys.size()); - //Thread.sleep(2000); + Thread.sleep(2000); openKeys = cluster.getKeySpaceManager().getMetadataManager() .getExpiredOpenKeys();